private void validateFile(FileSystem fileSys, Path name) throws IOException { FSDataInputStream stm = fileSys.open(name); final byte[] b = new byte[4192]; int num = 0; boolean gotException = false; try { while (num >= 0) { num = stm.read(b); if (num < 0) { break; } } } catch (BlockMissingException e) { gotException = true; } stm.close(); assertTrue("Expected BlockMissingException ", gotException); }
@Test(timeout=60000) public void testRemoveOneVolume() throws ReconfigurationException, InterruptedException, TimeoutException, IOException { startDFSCluster(1, 1); final short replFactor = 1; Path testFile = new Path("/test"); createFile(testFile, 10, replFactor); DataNode dn = cluster.getDataNodes().get(0); Collection<String> oldDirs = getDataDirs(dn); String newDirs = oldDirs.iterator().next(); // Keep the first volume. dn.reconfigurePropertyImpl( DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs); assertFileLocksReleased( new ArrayList<String>(oldDirs).subList(1, oldDirs.size())); dn.scheduleAllBlockReport(0); try { DFSTestUtil.readFile(cluster.getFileSystem(), testFile); fail("Expect to throw BlockMissingException."); } catch (BlockMissingException e) { GenericTestUtils.assertExceptionContains("Could not obtain block", e); } Path newFile = new Path("/newFile"); createFile(newFile, 6); String bpid = cluster.getNamesystem().getBlockPoolId(); List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = cluster.getAllBlockReports(bpid); assertEquals((int)replFactor, blockReports.size()); BlockListAsLongs blocksForVolume1 = blockReports.get(0).values().iterator().next(); // The first volume has half of the testFile and full of newFile. assertEquals(10 / 2 + 6, blocksForVolume1.getNumberOfBlocks()); }
@Test public void testRemoveOneVolume() throws ReconfigurationException, InterruptedException, TimeoutException, IOException { startDFSCluster(1, 1); final short replFactor = 1; Path testFile = new Path("/test"); createFile(testFile, 10, replFactor); DataNode dn = cluster.getDataNodes().get(0); Collection<String> oldDirs = getDataDirs(dn); String newDirs = oldDirs.iterator().next(); // Keep the first volume. dn.reconfigurePropertyImpl( DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs); assertFileLocksReleased( new ArrayList<String>(oldDirs).subList(1, oldDirs.size())); dn.scheduleAllBlockReport(0); try { DFSTestUtil.readFile(cluster.getFileSystem(), testFile); fail("Expect to throw BlockMissingException."); } catch (BlockMissingException e) { GenericTestUtils.assertExceptionContains("Could not obtain block", e); } Path newFile = new Path("/newFile"); createFile(newFile, 6); String bpid = cluster.getNamesystem().getBlockPoolId(); List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = cluster.getAllBlockReports(bpid); assertEquals((int)replFactor, blockReports.size()); BlockListAsLongs blocksForVolume1 = blockReports.get(0).values().iterator().next(); // The first volume has half of the testFile and full of newFile. assertEquals(10 / 2 + 6, blocksForVolume1.getNumberOfBlocks()); }
@Test(timeout = 30000) public void testSourceBlockRepair() throws IOException, InterruptedException { DistributedFileSystem dfs = (DistributedFileSystem) getFileSystem(); TestDfsClient testDfsClient = new TestDfsClient(getConfig()); testDfsClient.injectIntoDfs(dfs); FileStatus testFileStatus = dfs.getFileStatus(testFile); String path = testFileStatus.getPath().toUri().getPath(); int blockToLoose = new Random(seed).nextInt( (int) (testFileStatus.getLen() / testFileStatus.getBlockSize())); LocatedBlock lb = dfs.getClient().getLocatedBlocks(path, 0, Long.MAX_VALUE) .get(blockToLoose); DataNodeUtil.loseBlock(getCluster(), lb); List<LocatedBlock> lostBlocks = new ArrayList<LocatedBlock>(); lostBlocks.add(lb); LocatedBlocks locatedBlocks = new LocatedBlocks(0, false, lostBlocks, null, true); testDfsClient.setMissingLocatedBlocks(locatedBlocks); LocatedBlocks missingBlocks = new LocatedBlocks(testFileStatus.getLen(), false, new ArrayList<LocatedBlock>(), null, true); missingBlocks.getLocatedBlocks().add(lb); BlockReconstructor blockReconstructor = new BlockReconstructor(conf); Decoder decoder = new Decoder(conf, Util.getCodec(Util.Codecs.SRC)); blockReconstructor .processFile(testFile, testParityFile, missingBlocks, decoder, null); // Block is recovered to the same data node so no need to wait for the block report try { FSDataInputStream in = dfs.open(testFile); byte[] buff = new byte[TEST_BLOCK_COUNT * DFS_TEST_BLOCK_SIZE]; in.readFully(0, buff); } catch (BlockMissingException e) { LOG.error("Reading failed", e); Assert.fail("Repair failed. Missing a block."); } }
@Test public void testParityBlockRepair() throws IOException, InterruptedException { DistributedFileSystem dfs = (DistributedFileSystem) getFileSystem(); TestDfsClient testDfsClient = new TestDfsClient(getConfig()); testDfsClient.injectIntoDfs(dfs); FileStatus parityFileStatus = dfs.getFileStatus(testParityFile); String path = parityFileStatus.getPath().toUri().getPath(); int blockToLoose = new Random(seed).nextInt( (int) (parityFileStatus.getLen() / parityFileStatus.getBlockSize())); LocatedBlock lb = dfs.getClient().getLocatedBlocks(path, 0, Long.MAX_VALUE) .get(blockToLoose); DataNodeUtil.loseBlock(getCluster(), lb); List<LocatedBlock> lostBlocks = new ArrayList<LocatedBlock>(); lostBlocks.add(lb); LocatedBlocks locatedBlocks = new LocatedBlocks(0, false, lostBlocks, null, true); testDfsClient.setMissingLocatedBlocks(locatedBlocks); LocatedBlocks missingBlocks = new LocatedBlocks(parityFileStatus.getLen(), false, new ArrayList<LocatedBlock>(), null, true); missingBlocks.getLocatedBlocks().add(lb); BlockReconstructor blockReconstructor = new BlockReconstructor(conf); Decoder decoder = new Decoder(conf, Util.getCodec(Util.Codecs.SRC)); blockReconstructor .processParityFile(testFile, testParityFile, missingBlocks, decoder, null); // Block is recovered to the same data node so no need to wait for the block report try { FSDataInputStream in = dfs.open(testParityFile); byte[] buff = new byte[DFS_TEST_BLOCK_SIZE * codec.parityLength]; in.readFully(0, buff); } catch (BlockMissingException e) { LOG.error("Reading failed", e); Assert.fail("Repair failed. Missing a block."); } }
@Test public void testRaidFiles() throws IOException, InterruptedException { DistributedFileSystem dfs = (DistributedFileSystem) getFileSystem(); MapReduceEncodingManager encodingManager = new MapReduceEncodingManager(conf); Util.createRandomFile(dfs, testFile, seed, TEST_BLOCK_COUNT, DFS_TEST_BLOCK_SIZE); Codec.initializeCodecs(conf); EncodingPolicy policy = new EncodingPolicy("src", (short) 1); encodingManager.encodeFile(policy, testFile, parityFile, false); List<Report> reports; while ((reports = encodingManager.computeReports()).size() > 0) { Assert.assertNotSame("Encoding Assert.failed.", Report.Status.FAILED, reports.get(0).getStatus()); Thread.sleep(1000); } FileStatus parityStatus = dfs.getFileStatus(parityFile); Assert.assertEquals(parityStatus.getLen(), 6 * DFS_TEST_BLOCK_SIZE); try { FSDataInputStream in = dfs.open(parityFile); byte[] buff = new byte[6 * DFS_TEST_BLOCK_SIZE]; in.readFully(0, buff); } catch (BlockMissingException e) { LOG.error("Reading parity Assert.failed", e); Assert.fail("Parity could not be read."); } }
@Test public void testFailover() throws IOException, InterruptedException { DistributedFileSystem dfs = (DistributedFileSystem) getFileSystem(); MapReduceEncodingManager encodingManager = new MapReduceEncodingManager(mrCluster.getConfig()); Util.createRandomFile(dfs, testFile, seed, TEST_BLOCK_COUNT, DFS_TEST_BLOCK_SIZE); Codec.initializeCodecs(mrCluster.getConfig()); EncodingPolicy policy = new EncodingPolicy("src", (short) 1); encodingManager.encodeFile(policy, testFile, parityFile, false); MapReduceEncodingManager recoveredManager = new MapReduceEncodingManager(mrCluster.getConfig()); List<Report> reports = recoveredManager.computeReports(); Assert.assertEquals(1, reports.size()); Assert.assertNotSame("Encoding Assert.failed.", Report.Status.FAILED, reports.get(0).getStatus()); while ((reports = recoveredManager.computeReports()).size() > 0) { Assert.assertNotSame("Encoding Assert.failed.", Report.Status.FAILED, reports.get(0).getStatus()); Thread.sleep(1000); } FileStatus parityStatus = dfs.getFileStatus(parityFile); Assert.assertEquals(parityStatus.getLen(), 6 * DFS_TEST_BLOCK_SIZE); try { FSDataInputStream in = dfs.open(parityFile); byte[] buff = new byte[6 * DFS_TEST_BLOCK_SIZE]; in.readFully(0, buff); } catch (BlockMissingException e) { LOG.error("Reading parity Assert.failed", e); Assert.fail("Parity could not be read."); } }
/** * Builds (codec.stripeLength + codec.parityLength) inputs given some erased * locations. * Outputs: * - the array of input streams @param inputs * - the list of erased locations @param erasedLocations. * - the list of locations that are not read @param locationsToNotRead. */ public InputStream[] buildInputs(FileSystem srcFs, Path srcFile, FileStatus srcStat, FileSystem parityFs, Path parityFile, FileStatus parityStat, int stripeIdx, long offsetInBlock, List<Integer> erasedLocations, List<Integer> locationsToRead, ErasureCode code) throws IOException { InputStream[] inputs = new InputStream[codec.stripeLength + codec.parityLength]; boolean redo = false; do { /* * In the first iteration locationsToRead is empty. * It is populated according to locationsToReadForDecode. * In consecutive iterations (if a stream failed to open) * the list is cleared and re-populated. */ locationsToRead.clear(); locationsToRead.addAll(code.locationsToReadForDecode(erasedLocations)); for (int i = 0; i < inputs.length; i++) { boolean isErased = (erasedLocations.indexOf(i) != -1); boolean shouldRead = (locationsToRead.indexOf(i) != -1); try { InputStream stm = null; if (isErased || !shouldRead) { if (isErased) { LOG.info("Location " + i + " is erased, using zeros"); } else { LOG.info("Location " + i + " need not be read, using zeros"); } stm = new RaidUtils.ZeroInputStream(srcStat.getBlockSize() * ((i < codec.parityLength) ? stripeIdx * codec.parityLength + i : stripeIdx * codec.stripeLength + i - codec.parityLength)); } else { stm = buildOneInput(i, offsetInBlock, srcFs, srcFile, srcStat, parityFs, parityFile, parityStat); } inputs[i] = stm; } catch (IOException e) { if (e instanceof BlockMissingException || e instanceof ChecksumException) { erasedLocations.add(i); redo = true; RaidUtils.closeStreams(inputs); break; } else { throw e; } } } } while (redo); return inputs; }
@Test public void testReadBrokenFile() throws IOException, InterruptedException { DistributedFileSystem dfs = (DistributedFileSystem) ((ErasureCodingFileSystem) getFileSystem()).getFileSystem(); TestDfsClient testDfsClient = new TestDfsClient(getConfig()); testDfsClient.injectIntoDfs(dfs); Codec.initializeCodecs(getConfig()); EncodingPolicy policy = new EncodingPolicy("src", (short) 1); Util.createRandomFile(dfs, testFile, seed, TEST_BLOCK_COUNT, DFS_TEST_BLOCK_SIZE, policy); FileStatus testFileStatus = dfs.getFileStatus(testFile); // Busy waiting until the encoding is done while (!dfs.getEncodingStatus(testFile.toUri().getPath()).isEncoded()) { Thread.sleep(1000); } String path = testFileStatus.getPath().toUri().getPath(); int blockToLoose = new Random(seed).nextInt( (int) (testFileStatus.getLen() / testFileStatus.getBlockSize())); LocatedBlock lb = dfs.getClient().getLocatedBlocks(path, 0, Long.MAX_VALUE) .get(blockToLoose); DataNodeUtil.loseBlock(getCluster(), lb); List<LocatedBlock> lostBlocks = new ArrayList<LocatedBlock>(); lostBlocks.add(lb); LocatedBlocks locatedBlocks = new LocatedBlocks(0, false, lostBlocks, null, true); testDfsClient.setMissingLocatedBlocks(locatedBlocks); LOG.info("Losing block " + lb.toString()); getCluster().triggerBlockReports(); ErasureCodingFileSystem ecfs = (ErasureCodingFileSystem) getFileSystem(); NameNode nameNode = getCluster().getNameNode(); ecfs.initialize(nameNode.getUri(nameNode.getServiceRpcAddress()), conf); try { FSDataInputStream in = ecfs.open(testFile); byte[] buff = new byte[TEST_BLOCK_COUNT * DFS_TEST_BLOCK_SIZE]; in.readFully(0, buff); } catch (BlockMissingException e) { Assert.fail("Repair failed. Missing a block."); } }
@Test public void testCorruptRepair() throws IOException, InterruptedException { DistributedFileSystem dfs = (DistributedFileSystem) ((ErasureCodingFileSystem) getFileSystem()).getFileSystem(); TestDfsClient testDfsClient = new TestDfsClient(getConfig()); testDfsClient.injectIntoDfs(dfs); Codec.initializeCodecs(getConfig()); EncodingPolicy policy = new EncodingPolicy("src", (short) 1); Util.createRandomFile(dfs, testFile, seed, TEST_BLOCK_COUNT, DFS_TEST_BLOCK_SIZE, policy); FileStatus testFileStatus = dfs.getFileStatus(testFile); // Busy waiting until the encoding is done while (!dfs.getEncodingStatus(testFile.toUri().getPath()).isEncoded()) { Thread.sleep(1000); } String path = testFileStatus.getPath().toUri().getPath(); int blockToLoose = new Random(seed).nextInt( (int) (testFileStatus.getLen() / testFileStatus.getBlockSize())); final LocatedBlock lb = dfs.getClient() .getLocatedBlocks(path, 0, Long.MAX_VALUE) .get(blockToLoose); DataNodeUtil.loseBlock(getCluster(), lb); List<LocatedBlock> lostBlocks = new ArrayList<LocatedBlock>(); lostBlocks.add(lb); LocatedBlocks locatedBlocks = new LocatedBlocks(0, false, lostBlocks, null, true); testDfsClient.setMissingLocatedBlocks(locatedBlocks); LOG.info("Losing block " + lb.toString()); getCluster().triggerBlockReports(); final int inodeId = io.hops.TestUtil.getINodeId(cluster.getNameNode(), testFile); new LightWeightRequestHandler(HDFSOperationType.TEST) { @Override public Object performTask() throws IOException { BlockChecksumDataAccess da = (BlockChecksumDataAccess) HdfsStorageFactory.getDataAccess(BlockChecksumDataAccess.class); da.update(new BlockChecksum(inodeId, (int) (lb.getStartOffset() / lb.getBlockSize()), 0)); return null; } }.handle(); ErasureCodingFileSystem ecfs = (ErasureCodingFileSystem) getFileSystem(); NameNode nameNode = getCluster().getNameNode(); ecfs.initialize(nameNode.getUri(nameNode.getServiceRpcAddress()), conf); try { FSDataInputStream in = ecfs.open(testFile); byte[] buff = new byte[TEST_BLOCK_COUNT * DFS_TEST_BLOCK_SIZE]; in.readFully(0, buff); Assert.fail("Read succeeded with bogus checksum"); } catch (BlockMissingException e) { } }
@Test public void testBlockRepair() throws IOException, InterruptedException { DistributedFileSystem dfs = (DistributedFileSystem) getFileSystem(); MapReduceEncodingManager encodingManager = new MapReduceEncodingManager(conf); Util.createRandomFile(dfs, testFile, seed, TEST_BLOCK_COUNT, DFS_TEST_BLOCK_SIZE); Codec.initializeCodecs(conf); FileStatus testFileStatus = dfs.getFileStatus(testFile); EncodingPolicy policy = new EncodingPolicy("src", (short) 1); encodingManager.encodeFile(policy, testFile, parityFile, false); // Busy waiting until the encoding is done List<Report> reports; while ((reports = encodingManager.computeReports()).size() > 0) { Assert.assertNotSame(Report.Status.FAILED, reports.get(0).getStatus()); Thread.sleep(1000); } addEncodingStatus(testFile, policy); String path = testFileStatus.getPath().toUri().getPath(); int blockToLoose = new Random(seed).nextInt( (int) (testFileStatus.getLen() / testFileStatus.getBlockSize())); LocatedBlock lb = dfs.getClient().getLocatedBlocks(path, 0, Long.MAX_VALUE) .get(blockToLoose); DataNodeUtil.loseBlock(getCluster(), lb); List<LocatedBlock> lostBlocks = new ArrayList<LocatedBlock>(); lostBlocks.add(lb); LOG.info("Losing block " + lb.toString()); getCluster().triggerBlockReports(); MapReduceBlockRepairManager repairManager = new MapReduceBlockRepairManager(conf); repairManager.repairSourceBlocks("src", testFile, parityFile); while ((reports = repairManager.computeReports()).size() > 0) { Assert.assertNotSame("Repair Assert.failed.", Report.Status.FAILED, reports.get(0).getStatus()); Thread.sleep(1000); } try { FSDataInputStream in = dfs.open(testFile); byte[] buff = new byte[TEST_BLOCK_COUNT * DFS_TEST_BLOCK_SIZE]; in.readFully(0, buff); } catch (BlockMissingException e) { Assert.fail("Repair Assert.failed. Missing a block."); } }
@Test public void testCorruptedRepair() throws IOException, InterruptedException { DistributedFileSystem dfs = (DistributedFileSystem) getFileSystem(); MapReduceEncodingManager encodingManager = new MapReduceEncodingManager(conf); Util.createRandomFile(dfs, testFile, seed, TEST_BLOCK_COUNT, DFS_TEST_BLOCK_SIZE); Codec.initializeCodecs(conf); FileStatus testFileStatus = dfs.getFileStatus(testFile); EncodingPolicy policy = new EncodingPolicy("src", (short) 1); encodingManager.encodeFile(policy, testFile, parityFile, false); // Busy waiting until the encoding is done List<Report> reports; while ((reports = encodingManager.computeReports()).size() > 0) { Assert.assertNotSame(Report.Status.FAILED, reports.get(0).getStatus()); Thread.sleep(1000); } addEncodingStatus(testFile, policy); String path = testFileStatus.getPath().toUri().getPath(); int blockToLoose = new Random(seed).nextInt( (int) (testFileStatus.getLen() / testFileStatus.getBlockSize())); final LocatedBlock lb = dfs.getClient() .getLocatedBlocks(path, 0, Long.MAX_VALUE) .get(blockToLoose); DataNodeUtil.loseBlock(getCluster(), lb); List<LocatedBlock> lostBlocks = new ArrayList<LocatedBlock>(); lostBlocks.add(lb); LOG.info("Losing block " + lb.toString()); getCluster().triggerBlockReports(); final int inodeId = io.hops.TestUtil.getINodeId(cluster.getNameNode(), testFile); new LightWeightRequestHandler(HDFSOperationType.TEST) { @Override public Object performTask() throws IOException { BlockChecksumDataAccess da = (BlockChecksumDataAccess) HdfsStorageFactory.getDataAccess(BlockChecksumDataAccess.class); da.update(new BlockChecksum(inodeId, (int) (lb.getStartOffset() / lb.getBlockSize()), 0)); return null; } }.handle(); MapReduceBlockRepairManager repairManager = new MapReduceBlockRepairManager(conf); repairManager.repairSourceBlocks("src", testFile, parityFile); Report lastReport = null; while ((reports = repairManager.computeReports()).size() > 0) { Thread.sleep(1000); lastReport = reports.get(0); } Assert.assertEquals(Report.Status.FAILED, lastReport.getStatus()); try { FSDataInputStream in = dfs.open(testFile); byte[] buff = new byte[TEST_BLOCK_COUNT * DFS_TEST_BLOCK_SIZE]; in.readFully(0, buff); Assert.fail("Repair succeeded with bogus checksum."); } catch (BlockMissingException e) { } }
@Test public void testFailover() throws IOException, InterruptedException { DistributedFileSystem dfs = (DistributedFileSystem) getFileSystem(); MapReduceEncodingManager encodingManager = new MapReduceEncodingManager(conf); Util.createRandomFile(dfs, testFile, seed, TEST_BLOCK_COUNT, DFS_TEST_BLOCK_SIZE); Codec.initializeCodecs(conf); FileStatus testFileStatus = dfs.getFileStatus(testFile); EncodingPolicy policy = new EncodingPolicy("src", (short) 1); encodingManager.encodeFile(policy, testFile, parityFile, false); // Busy waiting until the encoding is done List<Report> reports; while ((reports = encodingManager.computeReports()).size() > 0) { Assert.assertNotSame(Report.Status.FAILED, reports.get(0).getStatus()); Thread.sleep(1000); } addEncodingStatus(testFile, policy); String path = testFileStatus.getPath().toUri().getPath(); int blockToLoose = new Random(seed).nextInt( (int) (testFileStatus.getLen() / testFileStatus.getBlockSize())); LocatedBlock lb = dfs.getClient().getLocatedBlocks(path, 0, Long.MAX_VALUE) .get(blockToLoose); DataNodeUtil.loseBlock(getCluster(), lb); List<LocatedBlock> lostBlocks = new ArrayList<LocatedBlock>(); lostBlocks.add(lb); LOG.info("Losing block " + lb.toString()); getCluster().triggerBlockReports(); MapReduceBlockRepairManager repairManager = new MapReduceBlockRepairManager(mrCluster.getConfig()); repairManager.repairSourceBlocks("src", testFile, parityFile); MapReduceBlockRepairManager recoverdManager = new MapReduceBlockRepairManager(mrCluster.getConfig()); reports = recoverdManager.computeReports(); Assert.assertEquals(1, reports.size()); Assert.assertNotSame("Repair Assert.failed.", Report.Status.FAILED, reports.get(0).getStatus()); while ((reports = recoverdManager.computeReports()).size() > 0) { Assert.assertNotSame("Repair Assert.failed.", Report.Status.FAILED, reports.get(0).getStatus()); Thread.sleep(1000); } try { FSDataInputStream in = dfs.open(testFile); byte[] buff = new byte[TEST_BLOCK_COUNT * DFS_TEST_BLOCK_SIZE]; in.readFully(0, buff); } catch (BlockMissingException e) { Assert.fail("Repair Assert.failed. Missing a block."); } }