public void testEncoder() throws Exception { mySetup(); int stripeSize = 10; int paritySize = 4; long blockSize = 8192; Path file1 = new Path("/user/raidtest/file1"); Path parityFile1 = new Path("/rsraid/user/raidtest/file1"); long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1, 1, 25, blockSize); try { ReedSolomonEncoder encoder = new ReedSolomonEncoder( conf, stripeSize, paritySize); short parityRepl = 1; encoder.encodeFile(fileSys, file1, fileSys, parityFile1, parityRepl, Reporter.NULL); FileStatus parityStat = fileSys.getFileStatus(parityFile1); assertEquals(4*8192*3, parityStat.getLen()); } finally { myTearDown(); } }
public void testRenameHar() throws Exception { try { mySetup("xor", 1); Path[] testPathList = new Path[] { new Path ("/user/dikang/raidtest/rename/f1"), new Path ("/user/dikang/raidtest/rename/f2"), new Path ("/user/dikang/raidtest/rename/f3")}; Path destHarPath = new Path ("/destraid/user/dikang/raidtest/rename"); DistributedRaidFileSystem raidFs = getRaidFS(); for (Path srcPath : testPathList) { TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, 8, 8192L); } raidFs.mkdirs(destHarPath); raidFs.mkdirs(new Path(destHarPath, "rename" + RaidNode.HAR_SUFFIX)); raidFs.rename(new Path("/user/dikang/raidtest"), new Path("/user/dikang/raidtest1")); fail("Expected fail for HAR rename"); } catch (IOException ie) { String message = ie.getMessage(); assertTrue(message.contains("HAR dir")); } finally { stopCluster(); } }
public void testRename() throws Exception { try { long[] crcs = new long[3]; int[] seeds = new int[3]; short repl = 1; Path dirPath = new Path("/user/dikang/raidtest"); mySetup(); DistributedRaidFileSystem raidFs = getRaidFS(); Path[] files = TestRaidDfs.createTestFiles(dirPath, fileSizes, blockSizes, crcs, seeds, fileSys, (short)1); FileStatus stat = raidFs.getFileStatus(dirPath); Codec codec = Codec.getCodec("dir-rs"); RaidNode.doRaid(conf, stat, new Path(codec.parityDirectory), codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, repl, repl); Path destPath = new Path("/user/dikang/raidtest_new"); assertTrue(raidFs.exists(dirPath)); assertFalse(raidFs.exists(destPath)); ParityFilePair parity = ParityFilePair.getParityFile( codec, stat, conf); Path srcParityPath = parity.getPath(); assertTrue(raidFs.exists(srcParityPath)); // do the rename file assertTrue(raidFs.rename(dirPath, destPath)); // verify the results. assertFalse(raidFs.exists(dirPath)); assertTrue(raidFs.exists(destPath)); assertFalse(raidFs.exists(srcParityPath)); FileStatus srcDest = raidFs.getFileStatus(destPath); parity = ParityFilePair.getParityFile(codec, srcDest, conf); assertTrue(raidFs.exists(parity.getPath())); } finally { stopCluster(); } }
public void testDeleteOneFile() throws Exception { try { long[] crcs = new long[3]; int[] seeds = new int[3]; short repl = 1; Path dirPath = new Path("/user/dikang/raidtest"); mySetup(); DistributedRaidFileSystem raidFs = getRaidFS(); Path[] files = TestRaidDfs.createTestFiles(dirPath, fileSizes, blockSizes, crcs, seeds, fileSys, (short)1); FileStatus stat = raidFs.getFileStatus(dirPath); Codec codec = Codec.getCodec("dir-rs"); RaidNode.doRaid(conf, stat, new Path(codec.parityDirectory), codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, repl, repl); ParityFilePair parity = ParityFilePair.getParityFile( codec, stat, conf); Path srcParityPath = parity.getPath(); assertTrue(raidFs.exists(srcParityPath)); // delete one file assertTrue(raidFs.delete(files[0])); // verify the results assertFalse(raidFs.exists(files[0])); // we still have the parity file assertTrue(raidFs.exists(srcParityPath)); // delete the left files assertTrue(raidFs.delete(files[1])); assertTrue(raidFs.delete(files[2])); // we will not touch the parity file. assertTrue(raidFs.exists(srcParityPath)); } finally { stopCluster(); } }
public void testDeleteDirRaidedFile() throws Exception { long[] crcs = new long[3]; int[] seeds = new int[3]; short repl = 1; Path dirPath = new Path("/user/dikang/raidtest"); mySetup(); //disable trash conf.setInt("fs.trash.interval", 0); DistributedRaidFileSystem raidFs = getRaidFS(); Path[] files = TestRaidDfs.createTestFiles(dirPath, fileSizes, blockSizes, crcs, seeds, fileSys, (short)1); FileStatus stat = raidFs.getFileStatus(dirPath); Codec codec = Codec.getCodec("dir-rs"); RaidNode.doRaid(conf, stat, new Path(codec.parityDirectory), codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, repl, repl); try { raidFs.delete(files[0]); fail(); } catch (Exception ex) { LOG.warn("Excepted error: " + ex.getMessage(), ex); } finally { stopCluster(); } }
/** * removes a specified block from MiniDFS storage and reports it as corrupt */ private void removeAndReportBlock(DistributedFileSystem blockDfs, Path filePath, LocatedBlock block) throws IOException { TestRaidDfs.corruptBlock(filePath, block.getBlock(), NUM_DATANODES, true, cluster); // report deleted block to the name node LocatedBlock[] toReport = { block }; blockDfs.getClient().namenode.reportBadBlocks(toReport); }
public void testVerifySourceFile() throws Exception { mySetup(); try { Path srcPath = new Path("/user/dikang/raidtest/file0"); int numBlocks = 8; TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, numBlocks, 8192L); assertTrue(fileSys.exists(srcPath)); Codec codec = Codec.getCodec("rs"); FileStatus stat = fileSys.getFileStatus(srcPath); // verify good file assertTrue(FastFileCheck.checkFile(conf, (DistributedFileSystem)fileSys, fileSys, srcPath, null, codec, RaidUtils.NULL_PROGRESSABLE, true)); // verify bad file LocatedBlocks fileLoc = RaidDFSUtil.getBlockLocations((DistributedFileSystem)fileSys, srcPath.toUri().getPath(), 0, stat.getLen()); // corrupt file1 Random rand = new Random(); int idx = rand.nextInt(numBlocks); TestRaidDfs.corruptBlock(srcPath, fileLoc.getLocatedBlocks().get(idx).getBlock(), NUM_DATANODES, true, dfs); assertFalse(FastFileCheck.checkFile(conf, (DistributedFileSystem)fileSys, fileSys, srcPath, null, codec, RaidUtils.NULL_PROGRESSABLE, true)); } finally { stopCluster(); } }
public void testVerifyFile() throws Exception { mySetup(); try { Path srcPath = new Path("/user/dikang/raidtest/file0"); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, 8, 8192L); assertTrue(fileSys.exists(srcPath)); Codec codec = Codec.getCodec("rs"); // generate the parity files. doRaid(srcPath, codec); FileStatus stat = fileSys.getFileStatus(srcPath); // verify the GOOD_FILE ParityFilePair pfPair = ParityFilePair.getParityFile(codec, stat, conf); assertNotNull(pfPair); assertTrue(FastFileCheck.checkFile(conf, (DistributedFileSystem)fileSys, fileSys, srcPath, pfPair.getPath(), codec, RaidUtils.NULL_PROGRESSABLE, false)); // verify the BAD_FILE fileSys.delete(srcPath); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, 8, 8192L); fileSys.setTimes(pfPair.getPath(), fileSys.getFileStatus(srcPath).getModificationTime(), -1); stat = fileSys.getFileStatus(srcPath); pfPair = ParityFilePair.getParityFile(codec, stat, conf); assertNotNull(pfPair); assertFalse(FastFileCheck.checkFile(conf, (DistributedFileSystem)fileSys, fileSys, srcPath, pfPair.getPath(), codec, RaidUtils.NULL_PROGRESSABLE, false)); } finally { stopCluster(); } }
/** * creates a MiniDFS instance with a raided file in it */ public void setUpCluster(int rsPairtyLength, long[] fileSizes, long[] blockSizes) throws IOException, ClassNotFoundException { new File(TEST_DIR).mkdirs(); // Make sure data directory exists conf = new Configuration(); Utils.loadTestCodecs(conf, STRIPE_BLOCKS, STRIPE_BLOCKS, 1, rsPairtyLength, "/destraid", "/destraidrs", false, true); conf.setBoolean("dfs.permissions", false); cluster = new MiniDFSCluster(conf, NUM_DATANODES, true, null); cluster.waitActive(); dfs = (DistributedFileSystem) cluster.getFileSystem(); TestDirectoryRaidDfs.setupStripeStore(conf, dfs); String namenode = dfs.getUri().toString(); FileSystem.setDefaultUri(conf, namenode); Codec dirRS = Codec.getCodec("rs"); long[] crcs = new long[fileSizes.length]; int[] seeds = new int[fileSizes.length]; files = TestRaidDfs.createTestFiles(srcDir, fileSizes, blockSizes, crcs, seeds, (FileSystem)dfs, (short)1); assertTrue(RaidNode.doRaid(conf, dfs.getFileStatus(srcDir), new Path(dirRS.parityDirectory), dirRS, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, 1, 1)); srcStats = new FileStatus[files.length]; for (int i = 0 ; i < files.length; i++) { srcStats[i] = dfs.getFileStatus(files[i]); } parityStat = dfs.getFileStatus(parityFile); clientConf = new Configuration(conf); clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedRaidFileSystem"); clientConf.set("fs.raid.underlyingfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); // prepare shell and arguments shell = new RaidShell(clientConf); args = new String[2]; args[0] = "-fsck"; args[1] = "/"; }
@Test public void testDirXORChooseReplicasToDeletePerformance() throws Exception { try { setupCluster(true, 1L, racks1, hosts1); // create test files int numFiles = 1000; long blockSize = 1024L; String parentDir = "/dir/"; for (int i = 0; i < numFiles; i++) { String file = parentDir + "file" + i; TestRaidDfs.createTestFile(fs, new Path(file), 3, 1, blockSize); } LOG.info("Created " + numFiles + " files"); Codec code = Codec.getCodec("xor"); FSNamesystem fsNameSys = cluster.getNameNode().namesystem; for (DatanodeDescriptor dd: fsNameSys.datanodeMap.values()) { LOG.info(dd); } // create fake parity file long numStripes = RaidNode.numStripes(numFiles, code.stripeLength); TestRaidDfs.createTestFile(fs, new Path(code.parityDirectory, "dir"), 3, (int)numStripes * code.parityLength, blockSize); long startTime = System.currentTimeMillis(); long total = 0L; fsNameSys.readLock(); for (BlocksMap.BlockInfo bi : fsNameSys.blocksMap.getBlocks()) { fsNameSys.replicator.chooseReplicaToDelete(bi.getINode(), bi, (short)3, fsNameSys.datanodeMap.values(), new ArrayList<DatanodeDescriptor>()); total++; } fsNameSys.readUnlock(); LOG.info("Average chooseReplicaToDelete time: " + ((double)(System.currentTimeMillis() - startTime) / total)); } finally { closeCluster(); } }
/** * creates a MiniDFS instance with a raided file in it */ public void setUpCluster(int rsPairtyLength) throws IOException, ClassNotFoundException { new File(TEST_DIR).mkdirs(); // Make sure data directory exists conf = new Configuration(); Utils.loadTestCodecs(conf, STRIPE_BLOCKS, STRIPE_BLOCKS, 1, rsPairtyLength, "/destraid", "/destraidrs", false, true); conf.setBoolean("dfs.permissions", false); cluster = new MiniDFSCluster(conf, NUM_DATANODES, true, null); cluster.waitActive(); dfs = (DistributedFileSystem) cluster.getFileSystem(); String namenode = dfs.getUri().toString(); FileSystem.setDefaultUri(conf, namenode); Codec dirRS = Codec.getCodec("rs"); long[] crcs = new long[fileSizes.length]; int[] seeds = new int[fileSizes.length]; files = TestRaidDfs.createTestFiles(srcDir, fileSizes, blockSizes, crcs, seeds, (FileSystem)dfs, (short)1); assertTrue(RaidNode.doRaid(conf, dfs.getFileStatus(srcDir), new Path(dirRS.parityDirectory), dirRS, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, 1, 1)); srcStats = new FileStatus[files.length]; for (int i = 0 ; i < files.length; i++) { srcStats[i] = dfs.getFileStatus(files[i]); } parityStat = dfs.getFileStatus(parityFile); clientConf = new Configuration(conf); clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedRaidFileSystem"); clientConf.set("fs.raid.underlyingfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); // prepare shell and arguments shell = new RaidShell(clientConf); args = new String[2]; args[0] = "-fsck"; args[1] = "/"; }
/** * removes a specified block from MiniDFS storage and reports it as corrupt */ private void removeAndReportBlock(DistributedFileSystem blockDfs, Path filePath, LocatedBlock block) throws IOException { TestRaidDfs.corruptBlock(cluster, filePath, block.getBlock(), NUM_DATANODES, true); // report deleted block to the name node LocatedBlock[] toReport = { block }; blockDfs.getClient().getNamenode().reportBadBlocks(toReport); }
/** * Test to run a filter */ private void doTestPathFilter(int iter, long targetReplication, long metaReplication, long stripeLength, long blockSize, int numBlock) throws Exception { LOG.info("doTestPathFilter started---------------------------:" + " iter " + iter + " blockSize=" + blockSize + " stripeLength=" + stripeLength); ConfigBuilder cb = new ConfigBuilder(CONFIG_FILE); cb.addPolicy("policy1", "/user/dhruba/raidtest", targetReplication, metaReplication); cb.addPolicy("policy2", "/user/dhruba/dir-raidtest", targetReplication, metaReplication, "dir-xor"); cb.persist(); RaidShell shell = null; Path dir = new Path("/user/dhruba/raidtest/"); Path file1 = new Path(dir + "/file" + iter); Path dir1 = new Path("/user/dhruba/dir-raidtest/1"); Path file2 = new Path(dir1 + "/file2"); Path file3 = new Path(dir1 + "/file3"); RaidNode cnode = null; try { Path destPath = new Path("/raid/user/dhruba/raidtest"); Path destPath1 = new Path("/dir-raid/user/dhruba/dir-raidtest"); fileSys.delete(dir, true); fileSys.delete(destPath, true); fileSys.delete(dir1, true); fileSys.delete(destPath1, true); long crc1 = createOldFile(fileSys, file1, 1, numBlock, blockSize); long crc2 = createOldFile(fileSys, file2, 1, numBlock, blockSize); long crc3 = createOldFile(fileSys, file3, 1, numBlock, blockSize); LOG.info("doTestPathFilter created test files for iteration " + iter); // create an instance of the RaidNode Configuration localConf = new Configuration(conf); cnode = RaidNode.createRaidNode(null, localConf); TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath); TestRaidDfs.waitForDirRaided(LOG, fileSys, dir1, destPath1); LOG.info("doTestPathFilter all files found in Raid."); // check for error at beginning of file shell = new RaidShell(conf); shell.initializeRpc(conf, cnode.getListenerAddress()); this.simulateErrors(shell, file1, crc1, blockSize, numBlock, stripeLength); this.simulateErrors(shell, file2, crc2, blockSize, numBlock, stripeLength); this.simulateErrors(shell, file3, crc3, blockSize, numBlock, stripeLength); } catch (Exception e) { LOG.info("doTestPathFilter Exception ", e); throw e; } finally { if (shell != null) shell.close(); if (cnode != null) { cnode.stop(); cnode.join(); } LOG.info("doTestPathFilter clean up" ); fileSys.delete(dir, true); fileSys.delete(new Path("/raid"), true); fileSys.delete(dir1, true); fileSys.delete(new Path("/dir-raid"), true); } LOG.info("doTestPathFilter completed:" + " blockSize=" + blockSize + " stripeLength=" + stripeLength); }
private void doCheckPolicy() throws Exception { LOG.info("doCheckPolicy started---------------------------:"); short srcReplication = 3; short targetReplication = 2; long metaReplication = 1; long stripeLength = 2; long blockSize = 1024; int numBlock = 3; ConfigBuilder cb = new ConfigBuilder(CONFIG_FILE); cb.addPolicy("policy1", "/user/dhruba/policytest", targetReplication, metaReplication); cb.addPolicy("policy2", "/user/dhruba/dir-policytest", targetReplication, metaReplication, "dir-xor"); cb.persist(); Path dir = new Path("/user/dhruba/policytest/"); Path dir1 = new Path("/user/dhruba/dir-policytest/1"); Path file1 = new Path(dir + "/file1"); Path file2 = new Path(dir1 + "/file2"); Path file3 = new Path(dir1 + "/file3"); Path file4 = new Path(dir1 + "/file4"); RaidNode cnode = null; try { Path destPath = new Path("/raid/user/dhruba/policytest"); Path destPath1 = new Path("/dir-raid/user/dhruba/dir-policytest"); fileSys.delete(dir, true); fileSys.delete(destPath, true); fileSys.delete(dir1, true); fileSys.delete(destPath1, true); // create an instance of the RaidNode Configuration localConf = new Configuration(conf); cnode = RaidNode.createRaidNode(null, localConf); // this file should be picked up RaidNode createOldFile(fileSys, file1, 3, numBlock, blockSize); createOldFile(fileSys, file2, 3, numBlock, blockSize); createOldFile(fileSys, file3, 3, numBlock, blockSize); TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath, targetReplication); TestRaidDfs.waitForDirRaided(LOG, fileSys, dir1, destPath1, targetReplication); LOG.info("doCheckPolicy all files found in Raid the first time."); LOG.info("doCheckPolicy: recreating source file"); long firstmodetime1 = fileSys.getFileStatus(file1).getModificationTime(); createOldFile(fileSys, file1, 3, numBlock, blockSize); assertTrue(fileSys.getFileStatus(file1).getModificationTime() > firstmodetime1); LOG.info("Change the modification time of directory"); long firstmodetime2 = fileSys.getFileStatus(dir1).getModificationTime(); createOldFile(fileSys, file4, 3, numBlock, blockSize); assertTrue(fileSys.getFileStatus(dir1).getModificationTime() > firstmodetime2); TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath, targetReplication); TestRaidDfs.waitForDirRaided(LOG, fileSys, dir1, destPath1, targetReplication); LOG.info("doCheckPolicy: file got re-raided as expected."); } catch (Exception e) { LOG.info("doCheckPolicy Exception ", e); throw e; } finally { if (cnode != null) { cnode.stop(); cnode.join(); } LOG.info("doTestPathFilter clean up"); fileSys.delete(dir, true); fileSys.delete(new Path("/raid"), true); fileSys.delete(dir1, true); fileSys.delete(new Path("/dir-raid"), true); } LOG.info("doCheckPolicy completed:"); }
private void checkTestFiles(String srcDir, String parityDir, int stripeLength, short targetReplication, short metaReplication, PlacementMonitor pm, Codec codec, int nfiles) throws IOException, InterruptedException { for(int i = 0 ; i < nfiles; i++){ Path srcPath = new Path(srcDir, "file" + i); Path parityPath = null; if (codec.isDirRaid) { parityPath = new Path(parityDir); TestRaidDfs.waitForDirRaided(LOG, fileSys, srcPath.getParent(), parityPath.getParent(), targetReplication); } else { parityPath = new Path(parityDir, "file" + i); TestRaidDfs.waitForFileRaided(LOG, fileSys, srcPath, parityPath.getParent(), targetReplication); } TestRaidDfs.waitForReplicasReduction(fileSys, parityPath, targetReplication); FileStatus srcFile = fileSys.getFileStatus(srcPath); FileStatus parityStat = fileSys.getFileStatus(parityPath); assertEquals(srcFile.getReplication(), targetReplication); assertEquals(parityStat.getReplication(), metaReplication); List<BlockInfo> parityBlocks = pm.getBlockInfos(fileSys, parityStat); int parityLength = codec.parityLength; if (parityLength == 1) { continue; } if (codec.isDirRaid && i > 0) { // One directory has one parity, just need to check once continue; } long numBlocks; if (codec.isDirRaid) { List<FileStatus> lfs = RaidNode.listDirectoryRaidFileStatus(conf, fileSys, new Path(srcDir)); numBlocks = DirectoryStripeReader.getBlockNum(lfs); } else { numBlocks = RaidNode.numBlocks(srcFile); } int numStripes = (int)RaidNode.numStripes(numBlocks, stripeLength); Map<String, Integer> nodeToNumBlocks = new HashMap<String, Integer>(); Set<String> nodesInThisStripe = new HashSet<String>(); for (int stripeIndex = 0; stripeIndex < numStripes; ++stripeIndex) { List<BlockInfo> stripeBlocks = new ArrayList<BlockInfo>(); // Adding parity blocks int stripeStart = parityLength * stripeIndex; int stripeEnd = Math.min( stripeStart + parityLength, parityBlocks.size()); if (stripeStart < stripeEnd) { stripeBlocks.addAll(parityBlocks.subList(stripeStart, stripeEnd)); } PlacementMonitor.countBlocksOnEachNode(stripeBlocks, nodeToNumBlocks, nodesInThisStripe); LOG.info("file: " + parityPath + " stripe: " + stripeIndex); int max = 0; for (String node: nodeToNumBlocks.keySet()) { int count = nodeToNumBlocks.get(node); LOG.info("node:" + node + " count:" + count); if (max < count) { max = count; } } assertTrue("pairty blocks in a stripe cannot live in the same node", max<parityLength); } } }
/** * Corrupt a parity file and wait for it to get fixed. */ private void implParityBlockFix(String testName, boolean local) throws Exception { LOG.info("Test " + testName + " started."); int stripeLength = 3; mySetup(stripeLength); long[] crcs = new long[3]; int[] seeds = new int[3]; Path dirPath = new Path("/user/dhruba/raidtest"); Path[] files = TestRaidDfs.createTestFiles(dirPath, fileSizes, blockSizes, crcs, seeds, fileSys, (short)1); Path destPath = new Path("/destraid/user/dhruba"); Path parityFile = new Path("/destraid/user/dhruba/raidtest"); LOG.info("Test " + testName + " created test files"); Configuration localConf = this.getRaidNodeConfig(conf, local); try { cnode = RaidNode.createRaidNode(null, localConf); TestRaidDfs.waitForDirRaided(LOG, fileSys, dirPath, destPath); cnode.stop(); cnode.join(); long parityCRC = RaidDFSUtil.getCRC(fileSys, parityFile); FileStatus parityStat = fileSys.getFileStatus(parityFile); DistributedFileSystem dfs = (DistributedFileSystem)fileSys; LocatedBlocks locs = RaidDFSUtil.getBlockLocations( dfs, parityFile.toUri().getPath(), 0, parityStat.getLen()); String[] corruptFiles = DFSUtil.getCorruptFiles(dfs); assertEquals("no corrupt files expected", 0, corruptFiles.length); assertEquals("filesFixed() should return 0 before fixing files", 0, cnode.blockIntegrityMonitor.getNumFilesFixed()); // Corrupt parity blocks for different stripes. int[] corruptBlockIdxs = new int[]{0, 1, 2}; for (int idx: corruptBlockIdxs) corruptBlock(locs.get(idx).getBlock(), dfsCluster); RaidDFSUtil.reportCorruptBlocks(dfs, parityFile, corruptBlockIdxs, 2*blockSize); corruptFiles = DFSUtil.getCorruptFiles(dfs); assertEquals("file not corrupted", 1, corruptFiles.length); assertEquals("wrong file corrupted", corruptFiles[0], parityFile.toUri().getPath()); cnode = RaidNode.createRaidNode(null, localConf); long start = System.currentTimeMillis(); while (cnode.blockIntegrityMonitor.getNumFilesFixed() < 1 && System.currentTimeMillis() - start < 120000) { LOG.info("Test " + testName + " waiting for files to be fixed."); Thread.sleep(3000); } TestBlockFixer.verifyMetrics(fileSys, cnode, local, 1L, corruptBlockIdxs.length); long checkCRC = RaidDFSUtil.getCRC(fileSys, parityFile); assertEquals("file not fixed", parityCRC, checkCRC); } catch (Exception e) { LOG.info("Test " + testName + " Exception " + e + StringUtils.stringifyException(e)); throw e; } finally { myTearDown(); } LOG.info("Test " + testName + " completed."); }
public void testMultiplePriorities() throws Exception { long[] crcs = new long[3]; int[] seeds = new int[3]; Path dirPath = new Path("/home/test"); int stripeLength = 3; short repl = 1; mySetup(stripeLength); Codec codec = Codec.getCodec("rs"); LOG.info("Starting testMultiplePriorities"); try { // Create test file and raid it. Path[] files = TestRaidDfs.createTestFiles(dirPath, fileSizes, blockSizes, crcs, seeds, fileSys, (short)1); FileStatus stat = fileSys.getFileStatus(dirPath); RaidNode.doRaid(conf, stat, new Path(codec.parityDirectory), codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, repl, repl); Integer[] corruptBlockIdxs = new Integer[]{0, 2}; LOG.info("Corrupt block " + corruptBlockIdxs + " of directory " + dirPath); TestDirectoryRaidDfs.corruptBlocksInDirectory(conf, dirPath, crcs, corruptBlockIdxs, fileSys, dfsCluster, false, true); // Create Block Fixer and fix. FakeDistBlockIntegrityMonitor distBlockFixer = new FakeDistBlockIntegrityMonitor(conf); assertEquals(0, distBlockFixer.submittedJobs.size()); // waiting for one job to submit long startTime = System.currentTimeMillis(); while (System.currentTimeMillis() - startTime < 120000 && distBlockFixer.submittedJobs.size() == 0) { distBlockFixer.getCorruptionMonitor().checkAndReconstructBlocks(); LOG.info("Waiting for jobs to submit"); Thread.sleep(10000); } int submittedJob = distBlockFixer.submittedJobs.size(); LOG.info("Already Submitted " + submittedJob + " jobs"); assertTrue("Should submit more than 1 jobs", submittedJob >= 1); // Corrupt two more blocks corruptBlockIdxs = new Integer[]{4, 5}; LOG.info("Corrupt block " + corruptBlockIdxs + " of directory " + dirPath); TestDirectoryRaidDfs.corruptBlocksInDirectory(conf, dirPath, crcs, corruptBlockIdxs, fileSys, dfsCluster, false, true); // A new job should be submitted since two blocks are corrupt. startTime = System.currentTimeMillis(); while (System.currentTimeMillis() - startTime < 120000 && distBlockFixer.submittedJobs.size() == submittedJob) { distBlockFixer.getCorruptionMonitor().checkAndReconstructBlocks(); LOG.info("Waiting for more jobs to submit"); Thread.sleep(10000); } LOG.info("Already Submitted " + distBlockFixer.submittedJobs.size() + " jobs"); assertTrue("should submit more than 1 jobs", distBlockFixer.submittedJobs.size() - submittedJob >= 1); } finally { myTearDown(); } }
public void implReadFromStripeInfo(int operatorId) throws IOException { final String testName = "testReadFromStripeInfo"; LOG.info("Test " + testName + " started."); int stripeLength = 3; mySetup(stripeLength); long[] crcs = new long[3]; int[] seeds = new int[3]; Path dirPath =new Path("/user/dikang/raidtestrs"); Path[] files = TestRaidDfs.createTestFiles(dirPath, fileSizes, blockSizes, crcs, seeds, fileSys, (short)1); LOG.info("Test " + testName + "created test files"); FileStatus stat = fileSys.getFileStatus(dirPath); Codec codec = Codec.getCodec("rs"); Path destPath = new Path("/destraidrs"); try { RaidNode.doRaid(conf, stat, destPath, codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, 1, 1); DistributedFileSystem dfs = (DistributedFileSystem) fileSys; String[] corruptFiles = DFSUtil.getCorruptFiles(dfs); assertEquals("no corrupt files expected", 0, corruptFiles.length); // corrupt files this.corruptFiles(dirPath, crcs, rsCorruptFileIdx1, dfs, files, rsNumCorruptBlocksInFiles1); if (operatorId == 0) { // delete file dfs.delete(files[0], true); LOG.info("Delete file: " + files[0]); } else if (operatorId == 1) { // add file TestRaidDfs.createTestFile(dfs, new Path(dirPath, "file3"), (short)1, 5, blockSize); } else { // rename file Path newFile = new Path(files[0].toUri().getPath() + "_rename"); dfs.rename(files[0], newFile); files[0] = newFile; } FileStatus newDirStat = dfs.getFileStatus(dirPath); // assert the modification is changed. assertNotSame(stat.getModificationTime(), newDirStat.getModificationTime()); // verify the files for (int i = 1; i < fileSizes.length; i++) { assertTrue("file " + files[i] + " not fixed", TestRaidDfs.validateFile(getRaidFS(), files[i], fileSizes[i], crcs[i])); } } finally { myTearDown(); } }
public void testMultiplePriorities() throws Exception { LOG.info("Test testMultiplePriorities started."); Path srcFile = new Path("/home/test/file1"); int repl = 1; int numBlocks = 8; long blockSize = 16384; int stripeLength = 3; Path destPath = new Path("/destraidrs"); mySetup(stripeLength, -1); // never har Codec codec = Codec.getCodec("rs"); LOG.info("Starting testMultiplePriorities"); try { // Create test file and raid it. TestRaidDfs.createTestFilePartialLastBlock( fileSys, srcFile, repl, numBlocks, blockSize); FileStatus stat = fileSys.getFileStatus(srcFile); RaidNode.doRaid(conf, stat, destPath, codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, repl, repl); // Corrupt first block of file. int blockIdxToCorrupt = 1; LOG.info("Corrupt block " + blockIdxToCorrupt + " of file " + srcFile); LocatedBlocks locations = getBlockLocations(srcFile, stat.getLen()); corruptBlock(locations.get(blockIdxToCorrupt).getBlock(), dfsCluster); RaidDFSUtil.reportCorruptBlocks(fileSys, srcFile, new int[]{1}, blockSize); // Create Block Fixer and fix. FakeDistBlockIntegrityMonitor distBlockFixer = new FakeDistBlockIntegrityMonitor(conf); assertEquals(0, distBlockFixer.submittedJobs.size()); // waiting for one job to submit long startTime = System.currentTimeMillis(); while (System.currentTimeMillis() - startTime < 120000 && distBlockFixer.submittedJobs.size() == 0) { distBlockFixer.getCorruptionMonitor().checkAndReconstructBlocks(); LOG.info("Waiting for jobs to submit"); Thread.sleep(10000); } int submittedJob = distBlockFixer.submittedJobs.size(); LOG.info("Already Submitted " + submittedJob + " jobs"); assertTrue("Should submit more than 1 jobs", submittedJob >= 1); // Corrupt one more block. blockIdxToCorrupt = 4; LOG.info("Corrupt block " + blockIdxToCorrupt + " of file " + srcFile); locations = getBlockLocations(srcFile, stat.getLen()); corruptBlock(locations.get(blockIdxToCorrupt).getBlock(), dfsCluster); RaidDFSUtil.reportCorruptBlocks(fileSys, srcFile, new int[]{4}, blockSize); // A new job should be submitted since two blocks are corrupt. startTime = System.currentTimeMillis(); while (System.currentTimeMillis() - startTime < 120000 && distBlockFixer.submittedJobs.size() == submittedJob) { distBlockFixer.getCorruptionMonitor().checkAndReconstructBlocks(); LOG.info("Waiting for more jobs to submit"); Thread.sleep(10000); } LOG.info("Already Submitted " + distBlockFixer.submittedJobs.size() + " jobs"); assertTrue("Should submit more than 1 jobs", distBlockFixer.submittedJobs.size() - submittedJob >= 1); } finally { myTearDown(); } }
public void testFileCheck() throws Exception { LOG.info("Test FileCheck started."); mySetup(3, -1); File fileList = null; try { MiniMRCluster mr = new MiniMRCluster(4, namenode, 3); String jobTrackerName = "localhost:" + mr.getJobTrackerPort(); conf.set("mapred.job.tracker", jobTrackerName); Path srcPath = new Path("/user/dikang/raidtest/file0"); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, 8, 8192L); Codec codec = Codec.getCodec("xor"); doRaid(srcPath, codec); FileStatus stat = fileSys.getFileStatus(srcPath); ParityFilePair pfPair = ParityFilePair.getParityFile(codec, stat, conf); assertNotNull(pfPair); // write the filelist fileList = new File(TEST_DIR + "/" + UUID.randomUUID().toString()); BufferedWriter writer = new BufferedWriter(new FileWriter(fileList)); writer.write(fileList.getPath() + "\n"); writer.close(); // Create RaidShell RaidShell shell = new RaidShell(conf); String[] args = new String[4]; args[0] = "-fileCheck"; args[1] = "-filesPerJob"; args[2] = "1"; args[3] = fileList.getPath(); assertEquals(0, ToolRunner.run(shell, args)); // test check source only // delete the parity file fileSys.delete(pfPair.getPath()); args = new String[5]; args[0] = "-fileCheck"; args[1] = "-filesPerJob"; args[2] = "1"; args[3] = "-sourceOnly"; args[4] = fileList.getPath(); assertEquals(0, ToolRunner.run(shell, args)); } finally { if (null != fileList) { fileList.delete(); } myTearDown(); } }
public void testRename() throws Exception { try { mySetup("xor", 1); Path srcPath = new Path("/user/dhruba/raidtest/rename/f1"); Path destPath = new Path("/user/dhruba/raidtest/rename/f2"); Path srcPath2 = new Path("/user/dhruba/raidtest/rename/f3"); Path destDirPath = new Path("/user/dhruba/raidtest/rename2"); Path destPath2 = new Path("/user/dhruba/raidtest/rename2/f3"); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, 8, 8192L); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath2, 1, 8, 8192L); DistributedRaidFileSystem raidFs = getRaidFS(); assertTrue(raidFs.exists(srcPath)); assertFalse(raidFs.exists(destPath)); // generate the parity files. doRaid(srcPath, Codec.getCodec("xor")); doRaid(srcPath2, Codec.getCodec("xor")); FileStatus srcStat = fileSys.getFileStatus(srcPath); ParityFilePair parity = ParityFilePair.getParityFile( Codec.getCodec("xor"), srcStat, conf); Path srcParityPath = parity.getPath(); assertTrue(raidFs.exists(srcParityPath)); assertFalse(raidFs.exists(destPath)); // do the rename file assertTrue(raidFs.rename(srcPath, destPath)); // verify the results. assertFalse(raidFs.exists(srcPath)); assertTrue(raidFs.exists(destPath)); assertFalse(raidFs.exists(srcParityPath)); FileStatus srcDest = fileSys.getFileStatus(destPath); parity = ParityFilePair.getParityFile(Codec.getCodec("xor"), srcDest, conf); assertTrue(raidFs.exists(parity.getPath())); // rename the dir assertFalse(raidFs.exists(destDirPath)); assertTrue(raidFs.rename(srcPath2.getParent(), destDirPath)); // verify the results. assertFalse(raidFs.exists(srcPath2.getParent())); assertTrue(raidFs.exists(destDirPath)); FileStatus srcDest2 = fileSys.getFileStatus(destPath2); parity = ParityFilePair.getParityFile(Codec.getCodec("xor"), srcDest2, conf); assertTrue(raidFs.exists(parity.getPath())); // try to rename not existed file. Path notExistedPath = new Path("/user/dhruba/raidtest/raidnotexist"); Path notExistedPath2 = new Path("/user/dhruba/raidtest/raidnotexist2"); assertFalse(raidFs.rename(notExistedPath, notExistedPath2)); } finally { stopCluster(); } }
public void testIgnoreCheckingParity() throws Exception { try { mySetup("xor", 1); Path srcPath = new Path("/tmp/raidtest/delete/f1"); Path srcPath2 = new Path("/tmp/raidtest/rename/f2"); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, 8, 8192L); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath2, 1, 8, 8192L); DistributedRaidFileSystem raidFs = getRaidFS(); assertTrue(raidFs.exists(srcPath)); assertTrue(raidFs.exists(srcPath2)); // generate the parity files. doRaid(srcPath, Codec.getCodec("xor")); doRaid(srcPath2, Codec.getCodec("xor")); FileStatus srcStat = fileSys.getFileStatus(srcPath); FileStatus srcStat2 = fileSys.getFileStatus(srcPath2); ParityFilePair parity = ParityFilePair.getParityFile( Codec.getCodec("xor"), srcStat, conf); Path srcParityPath = parity.getPath(); ParityFilePair parity2 = ParityFilePair.getParityFile( Codec.getCodec("xor"), srcStat2, conf); Path srcParityPath2 = parity2.getPath(); assertTrue(raidFs.exists(srcParityPath)); assertTrue(raidFs.exists(srcParityPath2)); // test delete file raidFs.delete(srcPath); // verify we did not delete the parityPath assertFalse(raidFs.exists(srcPath)); assertTrue(raidFs.exists(srcParityPath)); // test rename file raidFs.rename(srcPath2, new Path("/tmp/raidtest/rename/f3")); // verify we did not rename the parityPath assertFalse(raidFs.exists(srcPath2)); assertTrue(raidFs.exists(srcParityPath2)); } finally { stopCluster(); } }
public void testIgnoreCheckingParity2() throws Exception { try { mySetup("xor", 1); conf.set(DistributedRaidFileSystem.DIRECTORIES_IGNORE_PARITY_CHECKING_KEY, "/ignore1/test/,/ignore2/test/"); Path srcPath = new Path("/ignore1/test/rename/f1"); Path srcPath2 = new Path("/ignore2/test/rename/f1"); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, 8, 8192L); TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath2, 1, 8, 8192L); DistributedRaidFileSystem raidFs = getRaidFS(); assertTrue(raidFs.exists(srcPath)); assertTrue(raidFs.exists(srcPath2)); // generate the parity files. doRaid(srcPath, Codec.getCodec("xor")); doRaid(srcPath2, Codec.getCodec("xor")); FileStatus srcStat = fileSys.getFileStatus(srcPath); FileStatus srcStat2 = fileSys.getFileStatus(srcPath2); ParityFilePair parity = ParityFilePair.getParityFile( Codec.getCodec("xor"), srcStat, conf); Path srcParityPath = parity.getPath(); ParityFilePair parity2 = ParityFilePair.getParityFile( Codec.getCodec("xor"), srcStat2, conf); Path srcParityPath2 = parity2.getPath(); assertTrue(raidFs.exists(srcParityPath)); assertTrue(raidFs.exists(srcParityPath2)); // test rename files raidFs.rename(srcPath, new Path("/ignore1/test/rename/f2")); raidFs.rename(srcPath2, new Path("/ignore2/test/rename/f2")); // verify we did not rename the parityPath assertFalse(raidFs.exists(srcPath)); assertTrue(raidFs.exists(srcParityPath)); assertFalse(raidFs.exists(srcPath2)); assertTrue(raidFs.exists(srcParityPath2)); } finally { stopCluster(); } }
/** * Create parity file, delete original file and then validate that * parity file is automatically deleted. */ private void doTestPurge(int iter, long targetReplication, long metaReplication, long stripeLength, long blockSize, int numBlock) throws Exception { LOG.info("doTestPurge started---------------------------:" + " iter " + iter + " blockSize=" + blockSize + " stripeLength=" + stripeLength); mySetup(targetReplication, metaReplication); Utils.loadTestCodecs(conf, new Builder[] { Utils.getXORBuilder().setStripeLength(stripeLength), Utils.getXORBuilder().setStripeLength(stripeLength).dirRaid( true).setParityDir("/dir-raid").setCodeId("dir-xor") }); Path dir = new Path("/user/dhruba/raidtest/"); Path file1 = new Path(dir + "/file" + iter); Path dir1 = new Path("/user/dhruba/dirraidtest/" + iter); Path file2 = new Path(dir1 + "/file1"); Path file3 = new Path(dir1 + "/file2"); RaidNode cnode = null; try { List<Path> destPaths = new ArrayList<Path>(); Path destPath1 = new Path("/raid/user/dhruba/raidtest"); destPaths.add(destPath1); Path destPath2 = new Path("/dir-raid/user/dhruba/dirraidtest"); destPaths.add(destPath2); fileSys.delete(dir, true); fileSys.delete(dir1, true); fileSys.delete(destPath1, true); fileSys.delete(destPath2, true); TestRaidNode.createOldFile(fileSys, file1, 1, numBlock, blockSize); TestRaidNode.createOldFile(fileSys, file2, 1, numBlock, blockSize); TestRaidNode.createOldFile(fileSys, file3, 1, numBlock, blockSize); LOG.info("doTestPurge created test files for iteration " + iter); // create an instance of the RaidNode Configuration localConf = new Configuration(conf); cnode = RaidNode.createRaidNode(null, localConf); TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath1); TestRaidDfs.waitForDirRaided(LOG, fileSys, dir1, destPath2); LOG.info("doTestPurge all files found in Raid."); // delete original file assertTrue("Unable to delete original file " + file1 , fileSys.delete(file1, true)); LOG.info("deleted file " + file1); // delete original directory assertTrue("Unable to delete original directory " + dir1, fileSys.delete(dir1, true)); LOG.info("deleted directory " + dir1); waitFilesDelete(destPaths); } catch (Exception e) { LOG.info("doTestPurge Exception " + e + StringUtils.stringifyException(e)); throw e; } finally { if (cnode != null) { cnode.stop(); cnode.join(); } LOG.info("doTestPurge delete file " + file1); fileSys.delete(file1, true); fileSys.delete(dir1, true); } LOG.info("doTestPurge completed:" + " blockSize=" + blockSize + " stripeLength=" + stripeLength); }
/** * Create parity file, delete original file's directory and then validate that * parity directory is automatically deleted. */ public void testPurgeDirectory() throws Exception { long stripeLength = 5; long blockSize = 8192; long targetReplication = 1; long metaReplication = 1; int numBlock = 9; createClusters(true, 3); mySetup(targetReplication, metaReplication); Utils.loadTestCodecs(conf, new Builder[] { Utils.getXORBuilder().setStripeLength(stripeLength), Utils.getXORBuilder().setStripeLength(stripeLength).dirRaid( true).setParityDir("/dir-raid").setCodeId("dir-xor") }); Path dir = new Path("/user/dhruba/raidtest/"); Path file1 = new Path(dir + "/file1"); Path dir1 = new Path("/user/dhruba/dirraidtest/1"); Path file2 = new Path(dir1 + "/file2"); Path file3 = new Path(dir1 + "/file3"); RaidNode cnode = null; try { List<Path> destPaths = new ArrayList<Path>(); Path destPath1 = new Path("/raid/user/dhruba/raidtest"); destPaths.add(destPath1); Path destPath2 = new Path("/dir-raid/user/dhruba/dirraidtest"); destPaths.add(destPath2); TestRaidNode.createOldFile(fileSys, file1, 1, numBlock, blockSize); TestRaidNode.createOldFile(fileSys, file2, 1, numBlock, blockSize); TestRaidNode.createOldFile(fileSys, file3, 1, numBlock, blockSize); // create an instance of the RaidNode Configuration localConf = new Configuration(conf); cnode = RaidNode.createRaidNode(null, localConf); TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath1); TestRaidDfs.waitForDirRaided(LOG, fileSys, dir1, destPath2); // delete original directory. assertTrue("Unable to delete original directory " + file1 , fileSys.delete(file1.getParent(), true)); LOG.info("deleted directory " + file1.getParent()); // delete original directory assertTrue("Unable to delete original direcotry" + dir1, fileSys.delete(dir1.getParent(), true)); LOG.info("deleted directory " + dir1.getParent()); // wait till parity file and directory are automatically deleted long start = System.currentTimeMillis(); while ((fileSys.exists(destPath1) || fileSys.exists(destPath2)) && System.currentTimeMillis() - start < 120000) { LOG.info("testPurgeDirectory waiting for parity files to be removed."); Thread.sleep(1000); // keep waiting } assertFalse(fileSys.exists(destPath1)); assertFalse(fileSys.exists(destPath2)); } catch (Exception e) { LOG.info("testPurgeDirectory Exception " + e + StringUtils.stringifyException(e)); throw e; } finally { if (cnode != null) { cnode.stop(); cnode.join(); } LOG.info("testPurgeDirectory delete file " + file1); fileSys.delete(file1, true); fileSys.delete(dir1, true); stopClusters(); } }
public void testAbnormalDirectory() throws Exception { mySetup(); Codec codec = loadTestCodecs("xor", 4, true); try { Path sourceDir = new Path("/user/raid"); Path parityFile = new Path("/destraid/user/raid"); assertTrue(fileSys.mkdirs(sourceDir)); LOG.info("Test non-leaf directory"); assertFalse("Couldn't raid non-leaf directory ", doRaid(conf, fileSys, sourceDir.getParent(), codec)); assertFalse(fileSys.exists(parityFile.getParent())); LOG.info("Test empty directory"); assertFalse("Couldn't raid empty directory ", doRaid(conf, fileSys, sourceDir, codec)); assertFalse(fileSys.exists(parityFile)); LOG.info("Test empty file in the directory"); Path emptyFile = new Path(sourceDir, "emptyFile"); TestRaidDfs.createTestFile(fileSys, emptyFile, 1, 0, 8192L); assertTrue(fileSys.exists(emptyFile)); assertFalse("No raidable files in the directory", doRaid(conf, fileSys, sourceDir, codec)); assertFalse(fileSys.exists(parityFile)); LOG.info("Test not enough blocks in the directory"); Path file1 = new Path(sourceDir, "file1"); Path file2 = new Path(sourceDir, "file2"); TestRaidDfs.createTestFile(fileSys, file1, 1, 1, 8192L); TestRaidDfs.createTestFile(fileSys, file2, 1, 1, 8192L); LOG.info("Created two files with two blocks in total"); assertTrue(fileSys.exists(file1)); assertTrue(fileSys.exists(file2)); assertFalse("Not enough blocks in the directory", doRaid(conf, fileSys, sourceDir, codec)); assertFalse(fileSys.exists(parityFile)); } finally { myTearDown(); } }
private void validateSingleFile(String code, FileSystem fileSys, Path sourceDir, int stripeLength, int blockNum, boolean lastPartial) throws Exception { LOG.info("Test file with " + blockNum + " blocks and " + (lastPartial? "partial": "full") + " last block"); Codec codec = loadTestCodecs(code, stripeLength, true); Path parityDir = new Path(codec.parityDirectory); RaidDFSUtil.cleanUp(fileSys, sourceDir); RaidDFSUtil.cleanUp(fileSys, parityDir); fileSys.mkdirs(sourceDir); Path file1 = new Path(sourceDir, "file1"); if (!lastPartial) { TestRaidDfs.createTestFile(fileSys, file1, 2, blockNum, 8192L); } else { TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1, 2, blockNum, 8192L); } Path parityFile = RaidNode.getOriginalParityFile(parityDir, sourceDir); // Do directory level raid LOG.info("Create a directory-raid parity file " + parityFile); assertTrue("Cannot raid directory " + sourceDir, doRaid(conf, fileSys, sourceDir, codec)); assertEquals("Modification time should be the same", fileSys.getFileStatus(sourceDir).getModificationTime(), fileSys.getFileStatus(parityFile).getModificationTime()); assertEquals("Replica num of source file should be reduced to 1", fileSys.getFileStatus(file1).getReplication(), 1); assertEquals("Replica num of parity file should be reduced to 1", fileSys.getFileStatus(parityFile).getReplication(), 1); long dirCRC = RaidDFSUtil.getCRC(fileSys, parityFile); long dirLen = fileSys.getFileStatus(parityFile).getLen(); // remove the parity dir RaidDFSUtil.cleanUp(fileSys, parityDir); codec = loadTestCodecs(code, stripeLength, false); Path parityFile1 = RaidNode.getOriginalParityFile(parityDir, file1); LOG.info("Create a file-raid parity file " + parityFile1); assertTrue("Cannot raid file " + file1, doRaid(conf, fileSys, file1, codec)); assertTrue("Parity file doesn't match when the file has " + blockNum + " blocks ", TestRaidDfs.validateFile(fileSys, parityFile1, dirLen, dirCRC)); }
public void testOneFileDirectory() throws Exception { mySetup(); int stripeLength = 4; try { for (String code: RaidDFSUtil.codes) { LOG.info("testOneFileDirectory: Test code " + code); Codec codec = loadTestCodecs(code, stripeLength, true); Path sourceDir = new Path("/user/raid", code); assertTrue(fileSys.mkdirs(sourceDir)); Path twoBlockFile = new Path(sourceDir, "twoBlockFile");; LOG.info("Test one file with 2 blocks"); TestRaidDfs.createTestFile(fileSys, twoBlockFile, 2, 2, 8192L); assertTrue(fileSys.exists(twoBlockFile)); assertFalse("Not enough blocks in the directory", RaidNode.doRaid(conf, fileSys.getFileStatus(sourceDir), new Path(codec.parityDirectory), codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, 1, 1)); fileSys.delete(twoBlockFile, true); LOG.info("Test one file with blocks less than one stripe"); validateSingleFile(code, fileSys, sourceDir, stripeLength, 3, false); validateSingleFile(code, fileSys, sourceDir, stripeLength, 3, true); LOG.info("Test one file with one stripe blocks"); validateSingleFile(code, fileSys, sourceDir, stripeLength, stripeLength, false); validateSingleFile(code, fileSys, sourceDir, stripeLength, stripeLength, true); LOG.info("Test one file with more than one stripe blocks"); validateSingleFile(code, fileSys, sourceDir, stripeLength, stripeLength + 2, false); validateSingleFile(code, fileSys, sourceDir, stripeLength, stripeLength + 2, true); } } finally { myTearDown(); } }
public void testRenameOneFile() throws Exception { try { long[] crcs = new long[3]; int[] seeds = new int[3]; short repl = 1; Path dirPath = new Path("/user/dikang/raidtest"); mySetup(); DistributedRaidFileSystem raidFs = getRaidFS(); Path[] files = TestRaidDfs.createTestFiles(dirPath, fileSizes, blockSizes, crcs, seeds, fileSys, (short)1); FileStatus stat = raidFs.getFileStatus(dirPath); Codec codec = Codec.getCodec("dir-rs"); RaidNode.doRaid(conf, stat, new Path(codec.parityDirectory), codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, repl, repl); Path destPath = new Path("/user/dikang/raidtest_new"); assertTrue(raidFs.exists(dirPath)); assertFalse(raidFs.exists(destPath)); ParityFilePair parity = ParityFilePair.getParityFile( codec, stat, conf); Path srcParityPath = parity.getPath(); assertTrue(raidFs.exists(srcParityPath)); raidFs.mkdirs(destPath); // do the rename file assertTrue(raidFs.rename(files[0], new Path(destPath, "file0"))); // verify the results. assertFalse(raidFs.exists(files[0])); assertTrue(raidFs.exists(new Path(destPath, "file0"))); assertTrue(raidFs.exists(srcParityPath)); // rename the left files assertTrue(raidFs.rename(files[1], new Path(destPath, "file1"))); assertTrue(raidFs.rename(files[2], new Path(destPath, "file2"))); assertFalse(raidFs.exists(srcParityPath)); Path newParityPath = new Path(codec.parityDirectory, "user/dikang/raidtest_new"); assertTrue(raidFs.exists(newParityPath)); } finally { stopCluster(); } }
public void testDeleteAndUndelete() throws Exception { try { long[] crcs = new long[3]; int[] seeds = new int[3]; short repl = 1; Path dirPath = new Path("/user/dikang/raidtest"); mySetup(); DistributedRaidFileSystem raidFs = getRaidFS(); Path[] files = TestRaidDfs.createTestFiles(dirPath, fileSizes, blockSizes, crcs, seeds, fileSys, (short)1); FileStatus stat = raidFs.getFileStatus(dirPath); Codec codec = Codec.getCodec("dir-rs"); RaidNode.doRaid(conf, stat, new Path(codec.parityDirectory), codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, repl, repl); ParityFilePair parity = ParityFilePair.getParityFile( codec, stat, conf); Path srcParityPath = parity.getPath(); assertTrue(raidFs.exists(srcParityPath)); // do the delete file assertTrue(raidFs.delete(dirPath)); // verify the results. assertFalse(raidFs.exists(dirPath)); assertFalse(raidFs.exists(srcParityPath)); // do the undelete using non-exist userName String nonExistedUser = UUID.randomUUID().toString(); assertFalse(raidFs.undelete(dirPath, nonExistedUser)); // verify the results assertFalse(raidFs.exists(dirPath)); assertFalse(raidFs.exists(srcParityPath)); // do the undelete file using current userName assertTrue(raidFs.undelete(dirPath, null)); //verify the results. assertTrue(raidFs.exists(dirPath)); assertTrue(raidFs.exists(srcParityPath)); } finally { stopCluster(); } }
private void implDirParityRegen(int operatorId) throws Exception { int stripeLength = 3; mySetup(stripeLength); long[] crcs = new long[3]; int[] seeds = new int[3]; Path dirPath =new Path("/user/dhruba/raidtestrs"); Path[] files = TestRaidDfs.createTestFiles(dirPath, fileSizes, blockSizes, crcs, seeds, fileSys, (short)1); Path destPath = new Path("/destraidrs/user/dhruba"); Configuration localConf = this.getRaidNodeConfig(conf); try { cnode = RaidNode.createRaidNode(null, localConf); TestRaidDfs.waitForDirRaided(LOG, fileSys, dirPath, destPath); cnode.stop(); cnode.join(); DistributedFileSystem dfs = (DistributedFileSystem) fileSys; String[] corruptFiles = DFSUtil.getCorruptFiles(dfs); assertEquals("no corrupt files expected", 0, corruptFiles.length); assertEquals("filesFixed() should return 0 before fixing files", 0, cnode.blockIntegrityMonitor.getNumFilesFixed()); // corrupt files this.corruptFiles(dirPath, crcs, rsCorruptFileIdx1, dfs, files, rsNumCorruptBlocksInFiles1); FileStatus dirStat = dfs.getFileStatus(dirPath); if (operatorId == 0) { // delete file dfs.delete(files[0], true); } else if (operatorId == 1) { // add file TestRaidDfs.createTestFile(dfs, new Path(dirPath, "file3"), (short)1, 5, blockSize); } else { // rename file Path newFile = new Path(files[0].toUri().getPath() + "_rename"); dfs.rename(files[0], newFile); files[0] = newFile; } FileStatus newDirStat = dfs.getFileStatus(dirPath); // assert the modification is changed. assertNotSame(dirStat.getModificationTime(), newDirStat.getModificationTime()); Codec codec = Codec.getCodec("rs"); ParityFilePair pfPair = ParityFilePair.getParityFile(codec, newDirStat, localConf); assertNull(pfPair); cnode = RaidNode.createRaidNode(null, localConf); // wait for the re-generation of the parity files TestRaidDfs.waitForDirRaided(LOG, dfs, dirPath, destPath, (short)1, 240000); TestBlockFixer.verifyMetrics(dfs, cnode, LOGTYPES.MODIFICATION_TIME_CHANGE, LOGRESULTS.NONE, 1L, true); TestBlockFixer.verifyMetrics(dfs, cnode, LOGTYPES.MODIFICATION_TIME_CHANGE, LOGRESULTS.NONE, codec.id, 1L, true); } catch (Exception e) { throw e; } finally { myTearDown(); } }
public void verifyDecoder(String code, int parallelism) throws Exception { Codec codec = Codec.getCodec(code); conf.setInt("raid.encoder.parallelism", parallelism); ConfigBuilder cb = new ConfigBuilder(CONFIG_FILE); cb.addPolicy("RaidTest1", "/user/dikang/raidtest/file" + code + parallelism, 1, 1, code); cb.persist(); Path srcPath = new Path("/user/dikang/raidtest/file" + code + parallelism + "/file1"); long blockSize = 8192 * 1024L; long crc = TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, 7, blockSize); doRaid(srcPath, codec); FileStatus srcStat = fileSys.getFileStatus(srcPath); ParityFilePair pair = ParityFilePair.getParityFile(codec, srcStat, conf); FileStatus file1Stat = fileSys.getFileStatus(srcPath); long length = file1Stat.getLen(); LocatedBlocks file1Loc = RaidDFSUtil.getBlockLocations((DistributedFileSystem)fileSys, srcPath.toUri().getPath(), 0, length); // corrupt file int[] corruptBlockIdxs = new int[] {5}; long errorOffset = 5 * blockSize; for (int idx: corruptBlockIdxs) { TestBlockFixer.corruptBlock(file1Loc.get(idx).getBlock(), dfsCluster); } RaidDFSUtil.reportCorruptBlocks((DistributedFileSystem)fileSys, srcPath, corruptBlockIdxs, blockSize); Decoder decoder = new Decoder(conf, codec); ByteArrayOutputStream out = new ByteArrayOutputStream(); decoder.codec.simulateBlockFix = true; CRC32 oldCRC = decoder.fixErasedBlock(fileSys, srcStat, fileSys, pair.getPath(), true, blockSize, errorOffset, blockSize, false, out, null, null, false); decoder.codec.simulateBlockFix = false; out = new ByteArrayOutputStream(); decoder.fixErasedBlock(fileSys, srcStat, fileSys, pair.getPath(), true, blockSize, errorOffset, blockSize, false, out, null, null, false); // calculate the new crc CRC32 newCRC = new CRC32(); byte[] constructedBytes = out.toByteArray(); newCRC.update(constructedBytes); assertEquals(oldCRC.getValue(), newCRC.getValue()); }
public void testParityHarBadBlockFixer() throws Exception { LOG.info("Test testParityHarBlockFix started."); long blockSize = 8192L; int stripeLength = 3; mySetup(stripeLength, -1, "org.apache.hadoop.raid.BadXORCode", "org.apache.hadoop.raid.BadReedSolomonCode", "rs", true); Path file1 = new Path("/user/dhruba/raidtest/file1"); // Parity file will have 7 blocks. long crc = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1, 1, 20, blockSize); LOG.info("Created test files"); // create an instance of the RaidNode Configuration localConf = new Configuration(conf); localConf.setInt(RaidNode.RAID_PARITY_HAR_THRESHOLD_DAYS_KEY, 0); localConf.set("raid.blockfix.classname", "org.apache.hadoop.raid.DistBlockIntegrityMonitor"); localConf.setLong("raid.blockfix.filespertask", 2L); try { cnode = RaidNode.createRaidNode(null, localConf); Path harDirectory = new Path("/destraidrs/user/dhruba/raidtest/raidtest" + RaidNode.HAR_SUFFIX); long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < 1000 * 120) { if (fileSys.exists(harDirectory)) { break; } LOG.info("Waiting for har"); Thread.sleep(1000); } assertEquals(true, fileSys.exists(harDirectory)); cnode.stop(); cnode.join(); DistributedFileSystem dfs = (DistributedFileSystem)fileSys; String[] corruptFiles = DFSUtil.getCorruptFiles(dfs); assertEquals("no corrupt files expected", 0, corruptFiles.length); // Corrupt source blocks FileStatus stat = fileSys.getFileStatus(file1); LocatedBlocks locs = RaidDFSUtil.getBlockLocations( dfs, file1.toUri().getPath(), 0, stat.getLen()); int[] corruptBlockIdxs = new int[]{0}; for (int idx: corruptBlockIdxs) { TestBlockFixer.corruptBlock(locs.get(idx).getBlock(), dfsCluster); } RaidDFSUtil.reportCorruptBlocks(dfs, file1, corruptBlockIdxs, stat.getBlockSize()); corruptFiles = DFSUtil.getCorruptFiles(dfs); assertEquals("file not corrupted", 1, corruptFiles.length); assertEquals("wrong file corrupted", corruptFiles[0], file1.toUri().getPath()); cnode = RaidNode.createRaidNode(null, localConf); start = System.currentTimeMillis(); while (cnode.blockIntegrityMonitor.getNumFilesFixed() < 1 && System.currentTimeMillis() - start < 120000) { LOG.info("Waiting for files to be fixed."); Thread.sleep(1000); } long checkCRC = RaidDFSUtil.getCRC(fileSys, file1); assertEquals("file not fixed", crc, checkCRC); // Verify the counters are right long expectedNumFailures = corruptBlockIdxs.length; assertEquals(expectedNumFailures, cnode.blockIntegrityMonitor.getNumBlockFixSimulationFailures()); assertEquals(0, cnode.blockIntegrityMonitor.getNumBlockFixSimulationSuccess()); } catch (Exception e) { LOG.info("Exception ", e); throw e; } finally { myTearDown(); } LOG.info("Test testParityHarBlockFix completed."); }
public void testTooManyErrorsDecode() throws Exception { LOG.info("testTooManyErrorsDecode start"); long blockSize = 8192L; stripeLength = 3; mySetup("xor", 1); long[][] fsizes = {{2000L, 3000L, 2000L}, {blockSize + 1, blockSize + 1}, {2*blockSize, blockSize + blockSize/2}}; long[][] bsizes = {{blockSize, blockSize, blockSize}, {blockSize, blockSize}, {2*blockSize, blockSize}}; Integer[][] corrupts = {{0, 1}, {0, 2}, {1, 2}}; try { for (String code: RaidDFSUtil.codes) { Codec curCodec = Codec.getCodec(code); Path srcDir = new Path("/user/dhruba/" + code); for (int i = 0; i < corrupts.length; i++) { for (int j = 0; j < fsizes.length; j++) { long[] crcs = new long[fsizes[j].length]; int[] seeds = new int[fsizes[j].length]; Path parityDir = new Path(codec.parityDirectory); RaidDFSUtil.cleanUp(fileSys, srcDir); RaidDFSUtil.cleanUp(fileSys, parityDir); TestRaidDfs.createTestFiles(srcDir, fsizes[j], bsizes[j], crcs, seeds, fileSys, (short)1); assertTrue(RaidNode.doRaid(conf, fileSys.getFileStatus(srcDir), new Path(curCodec.parityDirectory), curCodec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, 1, 1)); boolean expectedExceptionThrown = false; try { corruptBlocksInDirectory(conf, srcDir, crcs, corrupts[i], fileSys, dfs, true, false); // Should not reach. } catch (IOException e) { LOG.info("Expected exception caught" + e); expectedExceptionThrown = true; } assertTrue(expectedExceptionThrown); } } } LOG.info("testTooManyErrorsDecode complete"); } finally { myTearDown(); } }
public void testTooManyErrorsEncode() throws Exception { LOG.info("testTooManyErrorsEncode complete"); stripeLength = 3; mySetup("xor", 1); // Encoding should fail when even one block is corrupt. Random rand = new Random(); try { for (String code: RaidDFSUtil.codes) { Codec curCodec = Codec.getCodec(code); Path srcDir = new Path("/user/dhruba/" + code); for (int j = 0; j < fileSizes.length; j++) { long[] crcs = new long[fileSizes[j].length]; int[] seeds = new int[fileSizes[j].length]; Path parityDir = new Path(codec.parityDirectory); RaidDFSUtil.cleanUp(fileSys, srcDir); RaidDFSUtil.cleanUp(fileSys, parityDir); TestRaidDfs.createTestFiles(srcDir, fileSizes[j], blockSizes[j], crcs, seeds, fileSys, (short)1); corruptBlocksInDirectory(conf, srcDir, crcs, new Integer[]{rand.nextInt() % 3}, fileSys, dfs, false, false); boolean expectedExceptionThrown = false; try { RaidNode.doRaid(conf, fileSys.getFileStatus(srcDir), new Path(curCodec.parityDirectory), curCodec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, 1, 1); // Should not reach. } catch (IOException e) { LOG.info("Expected exception caught" + e); expectedExceptionThrown = true; } assertTrue(expectedExceptionThrown); } } LOG.info("testTooManyErrorsEncode complete"); } finally { myTearDown(); } }