/** * This method is valid only if the data nodes have simulated data * @param dataNodeIndex - data node i which to inject - the index is same as for getDataNodes() * @param blocksToInject - the blocks * @param bpid - (optional) the block pool id to use for injecting blocks. * If not supplied then it is queried from the in-process NameNode. * @throws IOException * if not simulatedFSDataset * if any of blocks already exist in the data node * */ public void injectBlocks(int dataNodeIndex, Iterable<Block> blocksToInject, String bpid) throws IOException { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } if (bpid == null) { bpid = getNamesystem().getBlockPoolId(); } SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; sdataset.injectBlocks(bpid, blocksToInject); dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); }
/** * FileNotFoundException is expected for appending to a non-exisiting file * * @throws FileNotFoundException as the result */ @Test(expected = FileNotFoundException.class) public void testFileNotFound() throws IOException { Configuration conf = new HdfsConfiguration(); if (simulatedStorage) { SimulatedFSDataset.setFactory(conf); } MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); FileSystem fs = cluster.getFileSystem(); try { Path file1 = new Path("/nonexistingfile.dat"); fs.append(file1); } finally { fs.close(); cluster.shutdown(); } }
private void checkFile(FileSystem fileSys, Path name) throws IOException { BlockLocation[] locations = fileSys.getFileBlockLocations( fileSys.getFileStatus(name), 0, fileSize); assertEquals("Number of blocks", fileSize, locations.length); FSDataInputStream stm = fileSys.open(name); byte[] expected = new byte[fileSize]; if (simulatedStorage) { for (int i = 0; i < expected.length; ++i) { expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE; } } else { Random rand = new Random(seed); rand.nextBytes(expected); } // do a sanity check. Read the file byte[] actual = new byte[fileSize]; stm.readFully(0, actual); checkAndEraseData(actual, 0, expected, "Read Sanity Test"); stm.close(); }
/** * Tests small block size in in DFS. */ @Test public void testSmallBlock() throws IOException { Configuration conf = new HdfsConfiguration(); if (simulatedStorage) { SimulatedFSDataset.setFactory(conf); } conf.set(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, "1"); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); FileSystem fileSys = cluster.getFileSystem(); try { Path file1 = new Path("smallblocktest.dat"); writeFile(fileSys, file1); checkFile(fileSys, file1); cleanupFile(fileSys, file1); } finally { fileSys.close(); cluster.shutdown(); } }
/** * Multiple-NameNode version of injectBlocks. */ public void injectBlocks(int nameNodeIndex, int dataNodeIndex, Iterable<Block> blocksToInject) throws IOException { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } String bpid = getNamesystem(nameNodeIndex).getBlockPoolId(); SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; sdataset.injectBlocks(bpid, blocksToInject); dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); }
@Before public void setUp() throws Exception { this.conf = new Configuration(); if (simulatedStorage) { SimulatedFSDataset.setFactory(conf); } // lower heartbeat interval for fast recognition of DN death conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000); // handle under-replicated blocks quickly (for replication asserts) conf.setInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); // handle failures in the DFSClient pipeline quickly // (for cluster.shutdown(); fs.close() idiom) conf.setInt("ipc.client.connect.max.retries", 1); }
private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean verifyChecksum) throws IOException { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096); // Set short retry timeouts so this test runs faster conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0); if (simulatedStorage) { SimulatedFSDataset.setFactory(conf); } if (disableTransferTo) { conf.setBoolean("dfs.datanode.transferTo.allowed", false); } MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); FileSystem fileSys = cluster.getFileSystem(); fileSys.setVerifyChecksum(verifyChecksum); try { Path file1 = new Path("preadtest.dat"); writeFile(fileSys, file1); pReadFile(fileSys, file1); datanodeRestartTest(cluster, fileSys, file1); cleanupFile(fileSys, file1); } finally { fileSys.close(); cluster.shutdown(); } }
/** * Tests small block size in in DFS. */ @Test public void testSmallBlock() throws IOException { Configuration conf = new HdfsConfiguration(); if (simulatedStorage) { SimulatedFSDataset.setFactory(conf); } conf.set(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, "1"); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); DistributedFileSystem fileSys = cluster.getFileSystem(); try { Path file1 = new Path("/smallblocktest.dat"); DFSTestUtil.createFile(fileSys, file1, fileSize, fileSize, blockSize, (short) 1, seed); checkFile(fileSys, file1); cleanupFile(fileSys, file1); } finally { fileSys.close(); cluster.shutdown(); } }
@Before public void setUp() throws Exception { this.conf = new Configuration(); if (simulatedStorage) { SimulatedFSDataset.setFactory(conf); } // lower heartbeat interval for fast recognition of DN death conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000); // handle under-replicated blocks quickly (for replication asserts) conf.setInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); // handle failures in the DFSClient pipeline quickly // (for cluster.shutdown(); fs.close() idiom) conf.setInt("ipc.client.connect.max.retries", 1); }
/** * Test file creation using createNonRecursive(). */ @Test public void testFileCreationNonRecursive() throws IOException { Configuration conf = new HdfsConfiguration(); if (simulatedStorage) { SimulatedFSDataset.setFactory(conf); } MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); FileSystem fs = cluster.getFileSystem(); try { testFileCreationNonRecursive(fs); } finally { fs.close(); cluster.shutdown(); } }
private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean verifyChecksum) throws IOException { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY, 4096); // Set short retry timeouts so this test runs faster conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0); if (simulatedStorage) { SimulatedFSDataset.setFactory(conf); } if (disableTransferTo) { conf.setBoolean("dfs.datanode.transferTo.allowed", false); } MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); FileSystem fileSys = cluster.getFileSystem(); fileSys.setVerifyChecksum(verifyChecksum); try { Path file1 = new Path("/preadtest.dat"); writeFile(fileSys, file1); pReadFile(fileSys, file1); datanodeRestartTest(cluster, fileSys, file1); cleanupFile(fileSys, file1); } finally { fileSys.close(); cluster.shutdown(); } }
private void checkContent(FileSystem fileSys, Path name, int length) throws IOException { FSDataInputStream stm = fileSys.open(name); byte[] expected = new byte[length]; if (simulatedStorage) { for (int i= 0; i < expected.length; i++) { expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE; } } else { for (int i= 0; i < expected.length; i++) { expected[i] = fileContents[i]; } } // do a sanity check. Read the file byte[] actual = new byte[length]; stm.readFully(0, actual); checkData(actual, 0, expected, "Read 1"); }
@Override public void setUp() throws Exception { this.conf = new Configuration(); if (simulatedStorage) { conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); } conf.setBoolean("dfs.support.append", true); // lower heartbeat interval for fast recognition of DN death conf.setInt("heartbeat.recheck.interval", 1000); conf.setInt("dfs.heartbeat.interval", 1); conf.setInt("dfs.socket.timeout", 5000); // handle under-replicated blocks quickly (for replication asserts) // conf.set("dfs.replication.pending.timeout.sec", Integer.toString(5)); conf.setInt("dfs.replication.pending.timeout.sec", 5); conf.setInt("dfs.replication.interval", 1); // handle failures in the DFSClient pipeline quickly // (for cluster.shutdown(); fs.close() idiom) conf.setInt("ipc.client.connect.max.retries", 1); conf.setInt("dfs.client.block.recovery.retries", 1); // Delay blockReceived calls from DNs to be more similar to a real // cluster. 10ms is enough so that client often gets there first. conf.setInt("dfs.datanode.artificialBlockReceivedDelay", 10); }
int addSomeBlocks(FSDatasetInterface fsdataset, int startingBlockId) throws IOException { int bytesAdded = 0; for (int i = startingBlockId; i < startingBlockId+NUMBLOCKS; ++i) { Block b = new Block(i, 0, 0); // we pass expected len as zero, - fsdataset should use the sizeof actual data written BlockDataFile.Writer dataOut = ((SimulatedFSDataset.SimulatedBlockInlineChecksumFileWriter) fsdataset .writeToBlock(0, b, b, false, false, -1, -1)).getBlockDataFile() .getWriter(0); assertEquals(0, fsdataset.getFinalizedBlockLength(0,b)); for (int j=1; j <= blockIdToLen(i); ++j) { dataOut.write(new byte[] {(byte)j}); assertEquals(j, fsdataset.getFinalizedBlockLength(0,b)); // correct length even as we write bytesAdded++; } dataOut.close(); b.setNumBytes(blockIdToLen(i)); fsdataset.finalizeBlock(0,b); assertEquals(blockIdToLen(i), fsdataset.getFinalizedBlockLength(0,b)); } return bytesAdded; }
void checkBlockDataAndSize(FSDatasetInterface fsdataset, Block b, long expectedLen) throws IOException { ReplicaToRead replica = fsdataset.getReplicaToRead(0, b); InputStream input = replica.getBlockInputStream(null, 0); long lengthRead = 0; int data; int count = 0; while ((data = input.read()) != -1) { if (count++ < BlockInlineChecksumReader.getHeaderSize()) { continue; } assertEquals(SimulatedFSDataset.DEFAULT_DATABYTE, data); lengthRead++; } assertEquals(expectedLen, lengthRead); }
public void testInvalidate() throws IOException { FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); int bytesAdded = addSomeBlocks(fsdataset); Block[] deleteBlocks = new Block[2]; deleteBlocks[0] = new Block(1, 0, 0); deleteBlocks[1] = new Block(2, 0, 0); fsdataset.invalidate(0,deleteBlocks); checkInvalidBlock(deleteBlocks[0]); checkInvalidBlock(deleteBlocks[1]); long sizeDeleted = blockIdToLen(1) + blockIdToLen(2); assertEquals(bytesAdded-sizeDeleted, fsdataset.getDfsUsed()); assertEquals(fsdataset.getCapacity()-bytesAdded+sizeDeleted, fsdataset.getRemaining()); // Now make sure the rest of the blocks are valid for (int i=3; i <= NUMBLOCKS; ++i) { Block b = new Block(i, 0, 0); assertTrue(fsdataset.isValidBlock(0, b, false)); } }
/** * Multiple-NameNode version of {@link #injectBlocks(Iterable[])}. */ public void injectBlocks(int nameNodeIndex, int dataNodeIndex, Iterable<Block> blocksToInject) throws IOException { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } String bpid = getNamesystem(nameNodeIndex).getBlockPoolId(); SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; sdataset.injectBlocks(bpid, blocksToInject); dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); }
private void dfsPreadTest(boolean disableTransferTo, boolean verifyChecksum) throws IOException { Configuration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096); if (simulatedStorage) { SimulatedFSDataset.setFactory(conf); } if (disableTransferTo) { conf.setBoolean("dfs.datanode.transferTo.allowed", false); } MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); FileSystem fileSys = cluster.getFileSystem(); fileSys.setVerifyChecksum(verifyChecksum); try { Path file1 = new Path("preadtest.dat"); writeFile(fileSys, file1); pReadFile(fileSys, file1); datanodeRestartTest(cluster, fileSys, file1); cleanupFile(fileSys, file1); } finally { fileSys.close(); cluster.shutdown(); } }