/** * This method is valid only if the data nodes have simulated data * @param dataNodeIndex - data node i which to inject - the index is same as for getDataNodes() * @param blocksToInject - the blocks * @param bpid - (optional) the block pool id to use for injecting blocks. * If not supplied then it is queried from the in-process NameNode. * @throws IOException * if not simulatedFSDataset * if any of blocks already exist in the data node * */ public void injectBlocks(int dataNodeIndex, Iterable<Block> blocksToInject, String bpid) throws IOException { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } if (bpid == null) { bpid = getNamesystem().getBlockPoolId(); } SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; sdataset.injectBlocks(bpid, blocksToInject); dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); }
/** * Multiple-NameNode version of injectBlocks. */ public void injectBlocks(int nameNodeIndex, int dataNodeIndex, Iterable<Block> blocksToInject) throws IOException { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } String bpid = getNamesystem(nameNodeIndex).getBlockPoolId(); SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; sdataset.injectBlocks(bpid, blocksToInject); dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); }
/** * Checks a DataNode for correct reporting of failed volumes. * * @param dn DataNode to check * @param expectedVolumeFailuresCounter metric counter value for * VolumeFailures. The current implementation actually counts the number * of failed disk checker cycles, which may be different from the length of * expectedFailedVolumes if multiple disks fail in the same disk checker * cycle * @param expectCapacityKnown if true, then expect that the capacities of the * volumes were known before the failures, and therefore the lost capacity * can be reported * @param expectedFailedVolumes expected locations of failed volumes * @throws Exception if there is any failure */ private void checkFailuresAtDataNode(DataNode dn, long expectedVolumeFailuresCounter, boolean expectCapacityKnown, String... expectedFailedVolumes) throws Exception { assertCounter("VolumeFailures", expectedVolumeFailuresCounter, getMetrics(dn.getMetrics().name())); FsDatasetSpi<?> fsd = dn.getFSDataset(); assertEquals(expectedFailedVolumes.length, fsd.getNumFailedVolumes()); assertArrayEquals(expectedFailedVolumes, fsd.getFailedStorageLocations()); if (expectedFailedVolumes.length > 0) { assertTrue(fsd.getLastVolumeFailureDate() > 0); long expectedCapacityLost = getExpectedCapacityLost(expectCapacityKnown, expectedFailedVolumes.length); assertEquals(expectedCapacityLost, fsd.getEstimatedCapacityLostTotal()); } else { assertEquals(0, fsd.getLastVolumeFailureDate()); assertEquals(0, fsd.getEstimatedCapacityLostTotal()); } }
@Test(timeout = 60000) public void testVerifyBlockChecksumCommand() throws Exception { DFSTestUtil.createFile(fs, new Path("/bar"), 1234, (short) 1, 0xdeadbeef); FsDatasetSpi<?> fsd = datanode.getFSDataset(); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/bar")); File blockFile = getBlockFile(fsd, block.getBlockPoolId(), block.getLocalBlock()); assertEquals("ret: 1, You must specify a meta file with -meta", runCmd(new String[]{"verify", "-block", blockFile.getAbsolutePath()})); File metaFile = getMetaFile(fsd, block.getBlockPoolId(), block.getLocalBlock()); assertEquals("ret: 0, Checksum type: " + "DataChecksum(type=CRC32C, chunkSize=512)", runCmd(new String[]{"verify", "-meta", metaFile.getAbsolutePath()})); assertEquals("ret: 0, Checksum type: " + "DataChecksum(type=CRC32C, chunkSize=512)" + "Checksum verification succeeded on block file " + blockFile.getAbsolutePath(), runCmd(new String[]{"verify", "-meta", metaFile.getAbsolutePath(), "-block", blockFile.getAbsolutePath()}) ); }
@Test public void testReleaseOnFileDeletion() throws IOException, TimeoutException, InterruptedException { getClusterBuilder().setNumDatanodes(1) .setMaxLockedMemory(BLOCK_SIZE).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset(); Path path = new Path("/" + METHOD_NAME + ".dat"); makeTestFile(path, BLOCK_SIZE, true); ensureFileReplicasOnStorageType(path, RAM_DISK); assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE)); // Delete the file and ensure that the locked memory is released. fs.delete(path, false); DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); waitForLockedBytesUsed(fsd, 0); }
/** * Verify that locked bytes are correctly updated when a block is finalized * at less than its max length. */ @Test public void testShortBlockFinalized() throws IOException, TimeoutException, InterruptedException { getClusterBuilder().setNumDatanodes(1).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset(); Path path = new Path("/" + METHOD_NAME + ".dat"); makeTestFile(path, 1, true); assertThat(fsd.getCacheUsed(), is(osPageSize)); // Delete the file and ensure locked RAM usage goes to zero. fs.delete(path, false); waitForLockedBytesUsed(fsd, 0); }
/** * Wait until used locked byte count goes to the expected value. * @throws TimeoutException after 300 seconds. */ private void waitForLockedBytesUsed(final FsDatasetSpi<?> fsd, final long expectedLockedBytes) throws TimeoutException, InterruptedException { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { long cacheUsed = fsd.getCacheUsed(); LOG.info("cacheUsed=" + cacheUsed + ", waiting for it to be " + expectedLockedBytes); if (cacheUsed < 0) { throw new IllegalStateException("cacheUsed unpexpectedly negative"); } return (cacheUsed == expectedLockedBytes); } }, 1000, 300000); }
/** * * @param blockSize * @param perVolumeCapacity limit the capacity of each volume to the given * value. If negative, then don't limit. * @throws IOException */ private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacity) throws IOException { initConfig(blockSize); cluster = new MiniDFSCluster .Builder(conf) .storagesPerDatanode(STORAGES_PER_DATANODE) .numDataNodes(numDatanodes) .build(); fs = cluster.getFileSystem(); client = fs.getClient(); cluster.waitActive(); if (perVolumeCapacity >= 0) { try (FsDatasetSpi.FsVolumeReferences volumes = cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) { singletonVolumeRef = volumes.get(0).obtainReference(); } singletonVolume = ((FsVolumeImpl) singletonVolumeRef.getVolume()); singletonVolume.setCapacityForTesting(perVolumeCapacity); } }
@Test public void testClose() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn)); // test close testClose(dataSet, blocks); } finally { cluster.shutdown(); } }
@Test public void testAppend() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn)); // test append testAppend(bpid, dataSet, blocks); } finally { cluster.shutdown(); } }
/** * Check that the permissions of the local DN directories are as expected. */ @Test public void testLocalDirs() throws Exception { Configuration conf = new Configuration(); final String permStr = conf.get( DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY); FsPermission expected = new FsPermission(permStr); // Check permissions on directories in 'dfs.datanode.data.dir' FileSystem localFS = FileSystem.getLocal(conf); for (DataNode dn : cluster.getDataNodes()) { try (FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset().getFsVolumeReferences()) { for (FsVolumeSpi vol : volumes) { String dir = vol.getBasePath(); Path dataDir = new Path(dir); FsPermission actual = localFS.getFileStatus(dataDir).getPermission(); assertEquals("Permission for dir: " + dataDir + ", is " + actual + ", while expected is " + expected, expected, actual); } } } }
@Test(timeout = 60000) public void testVerifyBlockChecksumCommand() throws Exception { DFSTestUtil.createFile(fs, new Path("/bar"), 1234, (short) 1, 0xdeadbeef); FsDatasetSpi<?> fsd = datanode.getFSDataset(); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/bar")); File blockFile = getBlockFile(fsd, block.getBlockPoolId(), block.getLocalBlock()); assertEquals("ret: 1, You must specify a meta file with -meta\n", runCmd(new String[]{"verify", "-block", blockFile.getAbsolutePath()})); File metaFile = getMetaFile(fsd, block.getBlockPoolId(), block.getLocalBlock()); assertEquals("ret: 0, Checksum type: " + "DataChecksum(type=CRC32C, chunkSize=512)\n", runCmd(new String[]{"verify", "-meta", metaFile.getAbsolutePath()})); assertEquals("ret: 0, Checksum type: " + "DataChecksum(type=CRC32C, chunkSize=512)\n" + "Checksum verification succeeded on block file " + blockFile.getAbsolutePath() + "\n", runCmd(new String[]{"verify", "-meta", metaFile.getAbsolutePath(), "-block", blockFile.getAbsolutePath()}) ); }
/** * Initializes the {@link #data}. The initialization is done only once, when * handshake with the the first namenode is completed. */ private void initStorage(final NamespaceInfo nsInfo) throws IOException { final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory = FsDatasetSpi.Factory.getFactory(conf); if (!factory.isSimulated()) { final StartupOption startOpt = getStartupOption(conf); if (startOpt == null) { throw new IOException("Startup option not set."); } final String bpid = nsInfo.getBlockPoolID(); //read storage info, lock data dirs and transition fs state if necessary storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt); final StorageInfo bpStorage = storage.getBPStorage(bpid); LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID() + ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion() + ";nsInfo=" + nsInfo); } synchronized(this) { if (data == null) { data = factory.newInstance(this, storage, conf); } } }
/** * Multiple-NameNode version of {@link #injectBlocks(Iterable[])}. */ public void injectBlocks(int nameNodeIndex, int dataNodeIndex, Iterable<Block> blocksToInject) throws IOException { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } String bpid = getNamesystem(nameNodeIndex).getBlockPoolId(); SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; sdataset.injectBlocks(bpid, blocksToInject); dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); }
/** * Initializes the {@link #data}. The initialization is done only once, when * handshake with the the first namenode is completed. */ private void initStorage(final NamespaceInfo nsInfo) throws IOException { final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory = FsDatasetSpi.Factory.getFactory(conf); if (!factory.isSimulated()) { final StartupOption startOpt = getStartupOption(conf); if (startOpt == null) { throw new IOException("Startup option not set."); } final String bpid = nsInfo.getBlockPoolID(); //read storage info, lock data dirs and transition fs state if necessary storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt); final StorageInfo bpStorage = storage.getBPStorage(bpid); LOG.info( "Setting up storage: nsid=" + bpStorage.getNamespaceID() + ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion() + ";nsInfo=" + nsInfo); } synchronized (this) { if (data == null) { data = factory.newInstance(this, storage, conf); } } }
/** * This method is valid only if the data nodes have simulated data * * @param dataNodeIndex * - data node i which to inject - the index is same as for * getDataNodes() * @param blocksToInject * - the blocks * @throws IOException * if not simulatedFSDataset * if any of blocks already exist in the data node */ public void injectBlocks(int dataNodeIndex, Iterable<Block> blocksToInject) throws IOException { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException( "injectBlocks is valid only for SimilatedFSDataset"); } String bpid = getNamesystem().getBlockPoolId(); SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; sdataset.injectBlocks(bpid, blocksToInject); dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); }
/** * Multiple-NameNode version of {@link #injectBlocks(Iterable[])}. */ public void injectBlocks(int nameNodeIndex, int dataNodeIndex, Iterable<Block> blocksToInject) throws IOException { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException( "injectBlocks is valid only for SimilatedFSDataset"); } String bpid = getNamesystem(nameNodeIndex).getBlockPoolId(); SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; sdataset.injectBlocks(bpid, blocksToInject); dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); }
/** * @return the FileInputStream for the meta data of the given block. * @throws FileNotFoundException * if the file not found. * @throws ClassCastException * if the underlying input stream is not a FileInputStream. */ public static FileInputStream getMetaDataInputStream( ExtendedBlock b, FsDatasetSpi<?> data) throws IOException { final LengthInputStream lin = data.getMetaDataInputStream(b); if (lin == null) { throw new FileNotFoundException("Meta file for " + b + " not found."); } return (FileInputStream)lin.getWrappedStream(); }
DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) { this.datanode = datanode; this.dataset = dataset; int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT); scanPeriodMsecs = interval * 1000L; //msec int threads = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT); reportCompileThreadPool = Executors.newFixedThreadPool(threads, new Daemon.DaemonFactory()); masterThread = new ScheduledThreadPoolExecutor(1, new Daemon.DaemonFactory()); }
/** Is the given volume still valid in the dataset? */ private static boolean isValid(final FsDatasetSpi<?> dataset, final FsVolumeSpi volume) { for (FsVolumeSpi vol : dataset.getVolumes()) { if (vol == volume) { return true; } } return false; }