/** * Restart a datanode, on the same port if requested * @param dnprop the datanode to restart * @param keepPort whether to use the same port * @return true if restarting is successful * @throws IOException */ public synchronized boolean restartDataNode(DataNodeProperties dnprop, boolean keepPort) throws IOException { Configuration conf = dnprop.conf; String[] args = dnprop.dnArgs; SecureResources secureResources = dnprop.secureResources; Configuration newconf = new HdfsConfiguration(conf); // save cloned config if (keepPort) { InetSocketAddress addr = dnprop.datanode.getXferAddress(); conf.set(DFS_DATANODE_ADDRESS_KEY, addr.getAddress().getHostAddress() + ":" + addr.getPort()); conf.set(DFS_DATANODE_IPC_ADDRESS_KEY, addr.getAddress().getHostAddress() + ":" + dnprop.ipcPort); } DataNode newDn = DataNode.createDataNode(args, conf, secureResources); dataNodes.add(new DataNodeProperties( newDn, newconf, args, secureResources, newDn.getIpcPort())); numDataNodes++; return true; }
/** * This method is valid only if the data nodes have simulated data * @param dataNodeIndex - data node i which to inject - the index is same as for getDataNodes() * @param blocksToInject - the blocks * @param bpid - (optional) the block pool id to use for injecting blocks. * If not supplied then it is queried from the in-process NameNode. * @throws IOException * if not simulatedFSDataset * if any of blocks already exist in the data node * */ public void injectBlocks(int dataNodeIndex, Iterable<Block> blocksToInject, String bpid) throws IOException { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } if (bpid == null) { bpid = getNamesystem().getBlockPoolId(); } SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; sdataset.injectBlocks(bpid, blocksToInject); dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); }
/** * Create a file with one block and corrupt some/all of the block replicas. */ private void createAFileWithCorruptedBlockReplicas(Path filePath, short repl, int corruptBlockCount) throws IOException, AccessControlException, FileNotFoundException, UnresolvedLinkException, InterruptedException, TimeoutException { DFSTestUtil.createFile(dfs, filePath, BLOCK_SIZE, repl, 0); DFSTestUtil.waitReplication(dfs, filePath, repl); // Locate the file blocks by asking name node final LocatedBlocks locatedblocks = dfs.dfs.getNamenode() .getBlockLocations(filePath.toString(), 0L, BLOCK_SIZE); Assert.assertEquals(repl, locatedblocks.get(0).getLocations().length); // The file only has one block LocatedBlock lblock = locatedblocks.get(0); DatanodeInfo[] datanodeinfos = lblock.getLocations(); ExtendedBlock block = lblock.getBlock(); // corrupt some /all of the block replicas for (int i = 0; i < corruptBlockCount; i++) { DatanodeInfo dninfo = datanodeinfos[i]; final DataNode dn = cluster.getDataNode(dninfo.getIpcPort()); corruptBlock(block, dn); LOG.debug("Corrupted block " + block.getBlockName() + " on data node " + dninfo); } }
static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException { List<File> files = new ArrayList<File>(); List<DataNode> datanodes = cluster.getDataNodes(); String poolId = cluster.getNamesystem().getBlockPoolId(); List<Map<DatanodeStorage, BlockListAsLongs>> blocks = cluster.getAllBlockReports(poolId); for(int i = 0; i < blocks.size(); i++) { DataNode dn = datanodes.get(i); Map<DatanodeStorage, BlockListAsLongs> map = blocks.get(i); for(Map.Entry<DatanodeStorage, BlockListAsLongs> e : map.entrySet()) { for(Block b : e.getValue()) { files.add(DataNodeTestUtils.getFile(dn, poolId, b.getBlockId())); } } } return files; }
/** * Wait for the datanodes in the cluster to process any block * deletions that have already been asynchronously queued. */ public static void waitForDNDeletions(final MiniDFSCluster cluster) throws TimeoutException, InterruptedException { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { for (DataNode dn : cluster.getDataNodes()) { if (DataNodeTestUtils.getPendingAsyncDeletions(dn) > 0) { return false; } } return true; } }, 1000, 10000); }
/** * Add a thread which periodically triggers deletion reports, * heartbeats, and NN-side block work. * @param interval millisecond period on which to run */ public void addReplicationTriggerThread(final int interval) { testCtx.addThread(new RepeatingTestThread(testCtx) { @Override public void doAnAction() throws Exception { for (DataNode dn : cluster.getDataNodes()) { DataNodeTestUtils.triggerDeletionReport(dn); DataNodeTestUtils.triggerHeartbeat(dn); } for (int i = 0; i < 2; i++) { NameNode nn = cluster.getNameNode(i); BlockManagerTestUtil.computeAllPendingWork( nn.getNamesystem().getBlockManager()); } Thread.sleep(interval); } }); }
private InetSocketAddress getArbitraryLocalHostAddr() throws UnknownHostException{ Random rand = new Random(System.currentTimeMillis()); int port = rand.nextInt(65535); while (true) { boolean conflict = false; for (DataNode d : datanodes) { if (d.getXferAddress().getPort() == port) { port = rand.nextInt(65535); conflict = true; } } if (conflict == false) { break; } } return new InetSocketAddress(InetAddress.getLocalHost(), port); }
/** * Check that the NameNode is not attempting to cache anything. */ private void checkPendingCachedEmpty(MiniDFSCluster cluster) throws Exception { cluster.getNamesystem().readLock(); try { final DatanodeManager datanodeManager = cluster.getNamesystem().getBlockManager().getDatanodeManager(); for (DataNode dn : cluster.getDataNodes()) { DatanodeDescriptor descriptor = datanodeManager.getDatanode(dn.getDatanodeId()); Assert.assertTrue("Pending cached list of " + descriptor + " is not empty, " + Arrays.toString(descriptor.getPendingCached().toArray()), descriptor.getPendingCached().isEmpty()); } } finally { cluster.getNamesystem().readUnlock(); } }
private void verifyStats(NameNode namenode, FSNamesystem fsn, DatanodeInfo info, DataNode node, boolean decommissioning) throws InterruptedException, IOException { // Do the stats check over 10 heartbeats for (int i = 0; i < 10; i++) { long[] newStats = namenode.getRpcServer().getStats(); // For decommissioning nodes, ensure capacity of the DN is no longer // counted. Only used space of the DN is counted in cluster capacity assertEquals(newStats[0], decommissioning ? info.getDfsUsed() : info.getCapacity()); // Ensure cluster used capacity is counted for both normal and // decommissioning nodes assertEquals(newStats[1], info.getDfsUsed()); // For decommissioning nodes, remaining space from the DN is not counted assertEquals(newStats[2], decommissioning ? 0 : info.getRemaining()); // Ensure transceiver count is same as that DN assertEquals(fsn.getTotalLoad(), info.getXceiverCount()); DataNodeTestUtils.triggerHeartbeat(node); } }
private MiniDFSCluster createCluster() throws HDFSQuasiServiceException { MiniDFSCluster hdfsCluster = null; File baseDir = new File(getWorkingDir()).getAbsoluteFile(); FileUtil.fullyDelete(baseDir); this.conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); LOG.info("Using base dir " + baseDir.getAbsolutePath()); MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(this.conf); builder.numDataNodes(getNumberOfDataNodes()); try { hdfsCluster = builder.build(); } catch (IOException e) { LOG.error("Error in creating mini DFS cluster ", e); throw new HDFSQuasiServiceException("Error in creating mini DFS cluster ", e); } ListIterator<DataNode> itr = hdfsCluster.getDataNodes().listIterator(); LOG.info("NameNode: " + hdfsCluster.getNameNode().getNameNodeAddressHostPortString()); while (itr.hasNext()) { DataNode dn = itr.next(); LOG.info("DataNode: " + dn.getDisplayName()); } return hdfsCluster; }
@Override public void doGet(HttpServletRequest request, HttpServletResponse response ) throws ServletException, IOException { final PrintWriter out = response.getWriter(); final String path = ServletUtil.getDecodedPath(request, "/getFileChecksum"); final XMLOutputter xml = new XMLOutputter(out, "UTF-8"); xml.declaration(); final ServletContext context = getServletContext(); final DataNode datanode = (DataNode) context.getAttribute("datanode"); final Configuration conf = new HdfsConfiguration(datanode.getConf()); try { final DFSClient dfs = DatanodeJspHelper.getDFSClient(request, datanode, conf, getUGI(request, conf)); final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path, Long.MAX_VALUE); MD5MD5CRC32FileChecksum.write(xml, checksum); } catch(IOException ioe) { writeXml(ioe, path, xml); } catch (InterruptedException e) { writeXml(e, path, xml); } xml.endDocument(); }
/** * Multiple-NameNode version of injectBlocks. */ public void injectBlocks(int nameNodeIndex, int dataNodeIndex, Iterable<Block> blocksToInject) throws IOException { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } final DataNode dn = dataNodes.get(dataNodeIndex).datanode; final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } String bpid = getNamesystem(nameNodeIndex).getBlockPoolId(); SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; sdataset.injectBlocks(bpid, blocksToInject); dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); }
void register() throws IOException { // get versions from the namenode nsInfo = nameNodeProto.versionRequest(); dnRegistration = new DatanodeRegistration( new DatanodeID(DNS.getDefaultIP("default"), DNS.getDefaultHost("default", "default"), DataNode.generateUuid(), getNodePort(dnIdx), DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT), new DataStorage(nsInfo), new ExportedBlockKeys(), VersionInfo.getVersion()); // register datanode dnRegistration = nameNodeProto.registerDatanode(dnRegistration); //first block reports storage = new DatanodeStorage(DatanodeStorage.generateUuid()); final StorageBlockReport[] reports = { new StorageBlockReport(storage, BlockListAsLongs.EMPTY) }; nameNodeProto.blockReport(dnRegistration, nameNode.getNamesystem().getBlockPoolId(), reports, new BlockReportContext(1, 0, System.nanoTime())); }
/** Test to verify that InterDatanode RPC timesout as expected when * the server DN does not respond. */ @Test(expected=SocketTimeoutException.class) public void testInterDNProtocolTimeout() throws Throwable { final Server server = new TestServer(1, true); server.start(); final InetSocketAddress addr = NetUtils.getConnectAddress(server); DatanodeID fakeDnId = DFSTestUtil.getLocalDatanodeID(addr.getPort()); DatanodeInfo dInfo = new DatanodeInfo(fakeDnId); InterDatanodeProtocol proxy = null; try { proxy = DataNode.createInterDataNodeProtocolProxy( dInfo, conf, 500, false); proxy.initReplicaRecovery(new RecoveringBlock( new ExtendedBlock("bpid", 1), null, 100)); fail ("Expected SocketTimeoutException exception, but did not get."); } finally { if (proxy != null) { RPC.stopProxy(proxy); } server.stop(); } }
@Test public void testClose() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, dataSet); // test close testClose(dataSet, blocks); } finally { cluster.shutdown(); } }
@Test public void testAppend() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, dataSet); // test append testAppend(bpid, dataSet, blocks); } finally { cluster.shutdown(); } }
@Test public void testWriteToRbw() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, dataSet); // test writeToRbw testWriteToRbw(dataSet, blocks); } finally { cluster.shutdown(); } }
@Test public void testWriteToTemporary() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, dataSet); // test writeToTemporary testWriteToTemporary(dataSet, blocks); } finally { cluster.shutdown(); } }
@Before public void setUp() throws IOException { datanode = mock(DataNode.class); storage = mock(DataStorage.class); this.conf = new Configuration(); this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0); final DNConf dnConf = new DNConf(conf); when(datanode.getConf()).thenReturn(conf); when(datanode.getDnConf()).thenReturn(dnConf); final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf); when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner); createStorageDirs(storage, conf, NUM_INIT_VOLUMES); dataset = new FsDatasetImpl(datanode, storage, conf); for (String bpid : BLOCK_POOL_IDS) { dataset.addBlockPool(bpid, conf); } assertEquals(NUM_INIT_VOLUMES, dataset.getVolumes().size()); assertEquals(0, dataset.getNumFailedVolumes()); }
static void assertReports(int numDatanodes, DatanodeReportType type, DFSClient client, List<DataNode> datanodes, String bpid) throws IOException { final DatanodeInfo[] infos = client.datanodeReport(type); assertEquals(numDatanodes, infos.length); final DatanodeStorageReport[] reports = client.getDatanodeStorageReport(type); assertEquals(numDatanodes, reports.length); for(int i = 0; i < infos.length; i++) { assertEquals(infos[i], reports[i].getDatanodeInfo()); final DataNode d = findDatanode(infos[i].getDatanodeUuid(), datanodes); if (bpid != null) { //check storage final StorageReport[] computed = reports[i].getStorageReports(); Arrays.sort(computed, CMP); final StorageReport[] expected = d.getFSDataset().getStorageReports(bpid); Arrays.sort(expected, CMP); assertEquals(expected.length, computed.length); for(int j = 0; j < expected.length; j++) { assertEquals(expected[j].getStorage().getStorageID(), computed[j].getStorage().getStorageID()); } } } }
/** * Allow to get the hostname, using getHostName (hadoop 1) or getDisplayName (hadoop 2) */ private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException { Method m; try { m = DataNode.class.getMethod("getDisplayName"); } catch (NoSuchMethodException e) { try { m = DataNode.class.getMethod("getHostName"); } catch (NoSuchMethodException e1) { throw new RuntimeException(e1); } } String res = (String) m.invoke(dn); if (res.contains(":")) { return res.split(":")[0]; } else { return res; } }
@Test public void testClusterSetDatanodeDifferentStorageType() throws IOException { final Configuration conf = new HdfsConfiguration(); StorageType[][] storageType = new StorageType[][] { {StorageType.DISK, StorageType.ARCHIVE}, {StorageType.DISK}, {StorageType.ARCHIVE}}; final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(3).storageTypes(storageType).build(); try { cluster.waitActive(); ArrayList<DataNode> dataNodes = cluster.getDataNodes(); // Check the number of directory in DN's for (int i = 0; i < storageType.length; i++) { assertEquals(DataNode.getStorageLocations(dataNodes.get(i).getConf()) .size(), storageType[i].length); } } finally { MiniDFSCluster.shutdownCluster(cluster); } }
/** * Create a file with one block and corrupt some/all of the block replicas. */ private void createAFileWithCorruptedBlockReplicas(Path filePath, short repl, int corruptBlockCount) throws IOException, AccessControlException, FileNotFoundException, UnresolvedLinkException, InterruptedException, TimeoutException { DFSTestUtil.createFile(dfs, filePath, BLOCK_SIZE, repl, 0); DFSTestUtil.waitReplication(dfs, filePath, repl); // Locate the file blocks by asking name node final LocatedBlocks locatedblocks = dfs.dfs.getNamenode() .getBlockLocations(filePath.toString(), 0L, BLOCK_SIZE); Assert.assertEquals(repl, locatedblocks.get(0).getLocations().length); // The file only has one block LocatedBlock lblock = locatedblocks.get(0); DatanodeInfo[] datanodeinfos = lblock.getLocations(); ExtendedBlock block = lblock.getBlock(); // corrupt some /all of the block replicas for (int i = 0; i < corruptBlockCount; i++) { DatanodeInfo dninfo = datanodeinfos[i]; final DataNode dn = cluster.getDataNode(dninfo.getIpcPort()); cluster.corruptReplica(dn, block); LOG.debug("Corrupted block " + block.getBlockName() + " on data node " + dninfo); } }
private void stopDataNodes(BlockLocation[] locs, int[] datanodes) throws IOException { if (locs != null && locs.length > 0) { for (int failedDNIdx : datanodes) { String name = (locs[0].getNames())[failedDNIdx]; for (DataNode dn : cluster.getDataNodes()) { int port = dn.getXferPort(); if (name.contains(Integer.toString(port))) { dn.shutdown(); cluster.setDataNodeDead(dn.getDatanodeId()); LOG.info("stop datanode " + failedDNIdx); break; } } } } }
@Test public void testClusterNoStorageTypeSetForDatanodes() throws IOException { final Configuration conf = new HdfsConfiguration(); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(3).build(); try { cluster.waitActive(); ArrayList<DataNode> dataNodes = cluster.getDataNodes(); // Check the number of directory in DN's for (DataNode datanode : dataNodes) { assertEquals(DataNode.getStorageLocations(datanode.getConf()).size(), 2); } } finally { MiniDFSCluster.shutdownCluster(cluster); } }
/** * Wait for the datanodes in the cluster to process any block * deletions that have already been asynchronously queued. */ public static void waitForDNDeletions(final MiniDFSCluster cluster) throws TimeoutException, InterruptedException { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { for (DataNode dn : cluster.getDataNodes()) { if (cluster.getFsDatasetTestUtils(dn).getPendingAsyncDeletions() > 0) { return false; } } return true; } }, 1000, 10000); }
void register() throws IOException { // get versions from the namenode nsInfo = nameNodeProto.versionRequest(); dnRegistration = new DatanodeRegistration( new DatanodeID(DNS.getDefaultIP("default"), DNS.getDefaultHost("default", "default"), DataNode.generateUuid(), getNodePort(dnIdx), DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT), new DataStorage(nsInfo), new ExportedBlockKeys(), VersionInfo.getVersion()); // register datanode dnRegistration = dataNodeProto.registerDatanode(dnRegistration); dnRegistration.setNamespaceInfo(nsInfo); //first block reports storage = new DatanodeStorage(DatanodeStorage.generateUuid()); final StorageBlockReport[] reports = { new StorageBlockReport(storage, BlockListAsLongs.EMPTY) }; dataNodeProto.blockReport(dnRegistration, bpid, reports, new BlockReportContext(1, 0, System.nanoTime(), 0L)); }
private void checkStripedBlockUC(BlockInfoStriped block, boolean checkReplica) { assertEquals(0, block.numNodes()); Assert.assertFalse(block.isComplete()); Assert.assertEquals(StripedFileTestUtil.NUM_DATA_BLOCKS, block.getDataBlockNum()); Assert.assertEquals(StripedFileTestUtil.NUM_PARITY_BLOCKS, block.getParityBlockNum()); Assert.assertEquals(0, block.getBlockId() & HdfsServerConstants.BLOCK_GROUP_INDEX_MASK); Assert.assertEquals(HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, block.getBlockUCState()); if (checkReplica) { Assert.assertEquals(GROUP_SIZE, block.getUnderConstructionFeature().getNumExpectedLocations()); DatanodeStorageInfo[] storages = block.getUnderConstructionFeature() .getExpectedStorageLocations(); for (DataNode dn : cluster.getDataNodes()) { Assert.assertTrue(includeDataNode(dn.getDatanodeId(), storages)); } } }
@Test public void testClose() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn)); // test close testClose(dataSet, blocks); } finally { cluster.shutdown(); } }