/** * Check that the NameNode is not attempting to cache anything. */ private void checkPendingCachedEmpty(MiniDFSCluster cluster) throws Exception { cluster.getNamesystem().readLock(); try { final DatanodeManager datanodeManager = cluster.getNamesystem().getBlockManager().getDatanodeManager(); for (DataNode dn : cluster.getDataNodes()) { DatanodeDescriptor descriptor = datanodeManager.getDatanode(dn.getDatanodeId()); Assert.assertTrue("Pending cached list of " + descriptor + " is not empty, " + Arrays.toString(descriptor.getPendingCached().toArray()), descriptor.getPendingCached().isEmpty()); } } finally { cluster.getNamesystem().readUnlock(); } }
DatanodeInfo[] datanodeReport(final DatanodeReportType type ) throws AccessControlException, StandbyException { checkSuperuserPrivilege(); checkOperation(OperationCategory.UNCHECKED); readLock(); try { checkOperation(OperationCategory.UNCHECKED); final DatanodeManager dm = getBlockManager().getDatanodeManager(); final List<DatanodeDescriptor> results = dm.getDatanodeListForReport(type); DatanodeInfo[] arr = new DatanodeInfo[results.size()]; for (int i=0; i<arr.length; i++) { arr[i] = new DatanodeInfo(results.get(i)); } return arr; } finally { readUnlock(); } }
DatanodeStorageReport[] getDatanodeStorageReport(final DatanodeReportType type ) throws AccessControlException, StandbyException { checkSuperuserPrivilege(); checkOperation(OperationCategory.UNCHECKED); readLock(); try { checkOperation(OperationCategory.UNCHECKED); final DatanodeManager dm = getBlockManager().getDatanodeManager(); final List<DatanodeDescriptor> datanodes = dm.getDatanodeListForReport(type); DatanodeStorageReport[] reports = new DatanodeStorageReport[datanodes.size()]; for (int i = 0; i < reports.length; i++) { final DatanodeDescriptor d = datanodes.get(i); reports[i] = new DatanodeStorageReport(new DatanodeInfo(d), d.getStorageReports()); } return reports; } finally { readUnlock(); } }
/** * Returned information is a JSON representation of map with host name as the * key and value is a map of dead node attribute keys to its values */ @Override // NameNodeMXBean public String getDeadNodes() { final Map<String, Map<String, Object>> info = new HashMap<String, Map<String, Object>>(); final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>(); blockManager.getDatanodeManager().fetchDatanodes(null, dead, true); for (DatanodeDescriptor node : dead) { Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder() .put("lastContact", getLastContact(node)) .put("decommissioned", node.isDecommissioned()) .put("xferaddr", node.getXferAddr()) .build(); info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo); } return JSON.toString(info); }
/** * Returned information is a JSON representation of map with host name as the * key and value is a map of decommissioning node attribute keys to its * values */ @Override // NameNodeMXBean public String getDecomNodes() { final Map<String, Map<String, Object>> info = new HashMap<String, Map<String, Object>>(); final List<DatanodeDescriptor> decomNodeList = blockManager.getDatanodeManager( ).getDecommissioningNodes(); for (DatanodeDescriptor node : decomNodeList) { Map<String, Object> innerinfo = ImmutableMap .<String, Object> builder() .put("xferaddr", node.getXferAddr()) .put("underReplicatedBlocks", node.decommissioningStatus.getUnderReplicatedBlocks()) .put("decommissionOnlyReplicas", node.decommissioningStatus.getDecommissionOnlyReplicas()) .put("underReplicateInOpenFiles", node.decommissioningStatus.getUnderReplicatedInOpenFiles()) .build(); info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo); } return JSON.toString(info); }
@Test public void testCreateInvalidTopology() throws Exception { NetworkTopology invalCluster = new NetworkTopology(); DatanodeDescriptor invalDataNodes[] = new DatanodeDescriptor[] { DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1"), DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1"), DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1") }; invalCluster.add(invalDataNodes[0]); invalCluster.add(invalDataNodes[1]); try { invalCluster.add(invalDataNodes[2]); fail("expected InvalidTopologyException"); } catch (NetworkTopology.InvalidTopologyException e) { assertTrue(e.getMessage().startsWith("Failed to add ")); assertTrue(e.getMessage().contains( "You cannot have a rack and a non-rack node at the same " + "level of the network topology.")); } }
/** * @return the node which is expected to run the recovery of the * given block, which is known to be under construction inside the * given NameNOde. */ public static DatanodeDescriptor getExpectedPrimaryNode(NameNode nn, ExtendedBlock blk) { BlockManager bm0 = nn.getNamesystem().getBlockManager(); BlockInfoContiguous storedBlock = bm0.getStoredBlock(blk.getLocalBlock()); assertTrue("Block " + blk + " should be under construction, " + "got: " + storedBlock, storedBlock instanceof BlockInfoContiguousUnderConstruction); BlockInfoContiguousUnderConstruction ucBlock = (BlockInfoContiguousUnderConstruction)storedBlock; // We expect that the replica with the most recent heart beat will be // the one to be in charge of the synchronization / recovery protocol. final DatanodeStorageInfo[] storages = ucBlock.getExpectedStorageLocations(); DatanodeStorageInfo expectedPrimary = storages[0]; long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor() .getLastUpdateMonotonic(); for (int i = 1; i < storages.length; i++) { final long lastUpdate = storages[i].getDatanodeDescriptor() .getLastUpdateMonotonic(); if (lastUpdate > mostRecentLastUpdate) { expectedPrimary = storages[i]; mostRecentLastUpdate = lastUpdate; } } return expectedPrimary.getDatanodeDescriptor(); }
/** * Checks NameNode tracking of a particular DataNode for correct reporting of * failed volumes. * * @param dm DatanodeManager to check * @param dn DataNode to check * @param expectCapacityKnown if true, then expect that the capacities of the * volumes were known before the failures, and therefore the lost capacity * can be reported * @param expectedFailedVolumes expected locations of failed volumes * @throws Exception if there is any failure */ private void checkFailuresAtNameNode(DatanodeManager dm, DataNode dn, boolean expectCapacityKnown, String... expectedFailedVolumes) throws Exception { DatanodeDescriptor dd = cluster.getNamesystem().getBlockManager() .getDatanodeManager().getDatanode(dn.getDatanodeId()); assertEquals(expectedFailedVolumes.length, dd.getVolumeFailures()); VolumeFailureSummary volumeFailureSummary = dd.getVolumeFailureSummary(); if (expectedFailedVolumes.length > 0) { assertArrayEquals(expectedFailedVolumes, volumeFailureSummary .getFailedStorageLocations()); assertTrue(volumeFailureSummary.getLastVolumeFailureDate() > 0); long expectedCapacityLost = getExpectedCapacityLost(expectCapacityKnown, expectedFailedVolumes.length); assertEquals(expectedCapacityLost, volumeFailureSummary.getEstimatedCapacityLostTotal()); } else { assertNull(volumeFailureSummary); } }
/** * Returned information is a JSON representation of map with host name as the * key and value is a map of dead node attribute keys to its values */ @Override // NameNodeMXBean public String getDeadNodes() { final Map<String, Map<String, Object>> info = new HashMap<String, Map<String, Object>>(); final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>(); blockManager.getDatanodeManager().fetchDatanodes(null, dead, false); for (DatanodeDescriptor node : dead) { Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder() .put("lastContact", getLastContact(node)) .put("decommissioned", node.isDecommissioned()) .put("xferaddr", node.getXferAddr()) .build(); info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo); } return JSON.toString(info); }
/** * @return the node which is expected to run the recovery of the * given block, which is known to be under construction inside the * given NameNOde. */ public static DatanodeDescriptor getExpectedPrimaryNode(NameNode nn, ExtendedBlock blk) { BlockManager bm0 = nn.getNamesystem().getBlockManager(); BlockInfo storedBlock = bm0.getStoredBlock(blk.getLocalBlock()); assertTrue("Block " + blk + " should be under construction, " + "got: " + storedBlock, !storedBlock.isComplete()); // We expect that the replica with the most recent heart beat will be // the one to be in charge of the synchronization / recovery protocol. final DatanodeStorageInfo[] storages = storedBlock .getUnderConstructionFeature().getExpectedStorageLocations(); DatanodeStorageInfo expectedPrimary = storages[0]; long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor() .getLastUpdateMonotonic(); for (int i = 1; i < storages.length; i++) { final long lastUpdate = storages[i].getDatanodeDescriptor() .getLastUpdateMonotonic(); if (lastUpdate > mostRecentLastUpdate) { expectedPrimary = storages[i]; mostRecentLastUpdate = lastUpdate; } } return expectedPrimary.getDatanodeDescriptor(); }
/** Wait until the given namenode gets first block reports from all the datanodes */ public void waitFirstBRCompleted(int nnIndex, int timeout) throws IOException, TimeoutException, InterruptedException { if (namenodes.size() == 0 || getNN(nnIndex) == null || getNN(nnIndex).nameNode == null) { return; } final FSNamesystem ns = getNamesystem(nnIndex); final DatanodeManager dm = ns.getBlockManager().getDatanodeManager(); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { List<DatanodeDescriptor> nodes = dm.getDatanodeListForReport (DatanodeReportType.LIVE); for (DatanodeDescriptor node : nodes) { if (!node.checkBlockReportReceived()) { return false; } } return true; } }, 100, timeout); }
/** * Verify decommissioned nodes should not be selected. */ @Test public void testPlacementWithLocalRackNodesDecommissioned() throws Exception { String clientMachine = "client.foo.com"; // Map client to RACK3 String clientRack = "/RACK3"; StaticMapping.addNodeToRack(clientMachine, clientRack); final DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager(); DatanodeDescriptor dnd3 = dnm.getDatanode( cluster.getDataNodes().get(3).getDatanodeId()); assertEquals(dnd3.getNetworkLocation(), clientRack); dnm.getDecomManager().startDecommission(dnd3); try { testPlacement(clientMachine, clientRack, false); } finally { dnm.getDecomManager().stopDecommission(dnd3); } }
private boolean waitForBlockReport(final DataNode dn, final DatanodeDescriptor dnd) throws Exception { final DatanodeStorageInfo storage = dnd.getStorageInfos()[0]; final long lastCount = storage.getBlockReportCount(); dn.triggerBlockReport( new BlockReportOptions.Factory().setIncremental(false).build()); try { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { return lastCount != storage.getBlockReportCount(); } }, 10, 100); } catch (TimeoutException te) { return false; } return true; }
@Override // FSNamesystemMBean public int getNumDecomLiveDataNodes() { final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>(); getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true); int liveDecommissioned = 0; for (DatanodeDescriptor node : live) { liveDecommissioned += node.isDecommissioned() ? 1 : 0; } return liveDecommissioned; }
@Override // FSNamesystemMBean public int getNumDecomDeadDataNodes() { final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>(); getBlockManager().getDatanodeManager().fetchDatanodes(null, dead, true); int deadDecommissioned = 0; for (DatanodeDescriptor node : dead) { deadDecommissioned += node.isDecommissioned() ? 1 : 0; } return deadDecommissioned; }
@Override // FSNamesystemMBean public int getVolumeFailuresTotal() { List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>(); getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true); int volumeFailuresTotal = 0; for (DatanodeDescriptor node: live) { volumeFailuresTotal += node.getVolumeFailures(); } return volumeFailuresTotal; }
@Override // FSNamesystemMBean public long getEstimatedCapacityLostTotal() { List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>(); getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true); long estimatedCapacityLostTotal = 0; for (DatanodeDescriptor node: live) { VolumeFailureSummary volumeFailureSummary = node.getVolumeFailureSummary(); if (volumeFailureSummary != null) { estimatedCapacityLostTotal += volumeFailureSummary.getEstimatedCapacityLostTotal(); } } return estimatedCapacityLostTotal; }
public void setCachedLocations(LocatedBlock block) { CachedBlock cachedBlock = new CachedBlock(block.getBlock().getBlockId(), (short)0, false); cachedBlock = cachedBlocks.get(cachedBlock); if (cachedBlock == null) { return; } List<DatanodeDescriptor> datanodes = cachedBlock.getDatanodes(Type.CACHED); for (DatanodeDescriptor datanode : datanodes) { block.addCachedLoc(datanode); } }
public final void processCacheReport(final DatanodeID datanodeID, final List<Long> blockIds) throws IOException { namesystem.writeLock(); final long startTime = Time.monotonicNow(); final long endTime; try { final DatanodeDescriptor datanode = blockManager.getDatanodeManager().getDatanode(datanodeID); if (datanode == null || !datanode.isAlive) { throw new IOException( "processCacheReport from dead or unregistered datanode: " + datanode); } processCacheReportImpl(datanode, blockIds); } finally { endTime = Time.monotonicNow(); namesystem.writeUnlock(); } // Log the block report processing stats from Namenode perspective final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); if (metrics != null) { metrics.addCacheBlockReport((int) (endTime - startTime)); } LOG.debug("Processed cache report from {}, blocks: {}, " + "processing time: {} msecs", datanodeID, blockIds.size(), (endTime - startTime)); }
private void processCacheReportImpl(final DatanodeDescriptor datanode, final List<Long> blockIds) { CachedBlocksList cached = datanode.getCached(); cached.clear(); CachedBlocksList cachedList = datanode.getCached(); CachedBlocksList pendingCachedList = datanode.getPendingCached(); for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) { long blockId = iter.next(); LOG.trace("Cache report from datanode {} has block {}", datanode, blockId); CachedBlock cachedBlock = new CachedBlock(blockId, (short)0, false); CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock); // Add the block ID from the cache report to the cachedBlocks map // if it's not already there. if (prevCachedBlock != null) { cachedBlock = prevCachedBlock; } else { cachedBlocks.put(cachedBlock); LOG.trace("Added block {} to cachedBlocks", cachedBlock); } // Add the block to the datanode's implicit cached block list // if it's not already there. Similarly, remove it from the pending // cached block list if it exists there. if (!cachedBlock.isPresent(cachedList)) { cachedList.add(cachedBlock); LOG.trace("Added block {} to CACHED list.", cachedBlock); } if (cachedBlock.isPresent(pendingCachedList)) { pendingCachedList.remove(cachedBlock); LOG.trace("Removed block {} from PENDING_CACHED list.", cachedBlock); } } }
@Before public void setupDatanodes() { dataNodes = new DatanodeDescriptor[] { DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1"), DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1"), DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r2"), DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2"), DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2"), DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d2/r3"), DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3"), DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3"), DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d3/r1"), DFSTestUtil.getDatanodeDescriptor("10.10.10.10", "/d3/r1"), DFSTestUtil.getDatanodeDescriptor("11.11.11.11", "/d3/r1"), DFSTestUtil.getDatanodeDescriptor("12.12.12.12", "/d3/r2"), DFSTestUtil.getDatanodeDescriptor("13.13.13.13", "/d3/r2"), DFSTestUtil.getDatanodeDescriptor("14.14.14.14", "/d4/r1"), DFSTestUtil.getDatanodeDescriptor("15.15.15.15", "/d4/r1"), DFSTestUtil.getDatanodeDescriptor("16.16.16.16", "/d4/r1"), DFSTestUtil.getDatanodeDescriptor("17.17.17.17", "/d4/r1"), DFSTestUtil.getDatanodeDescriptor("18.18.18.18", "/d4/r1"), DFSTestUtil.getDatanodeDescriptor("19.19.19.19", "/d4/r1"), DFSTestUtil.getDatanodeDescriptor("20.20.20.20", "/d4/r1"), }; for (int i = 0; i < dataNodes.length; i++) { cluster.add(dataNodes[i]); } dataNodes[9].setDecommissioned(); dataNodes[10].setDecommissioned(); }
@Test public void testContains() throws Exception { DatanodeDescriptor nodeNotInMap = DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r4"); for (int i=0; i < dataNodes.length; i++) { assertTrue(cluster.contains(dataNodes[i])); } assertFalse(cluster.contains(nodeNotInMap)); }
/** * This picks a large number of nodes at random in order to ensure coverage * * @param numNodes the number of nodes * @param excludedScope the excluded scope * @return the frequency that nodes were chosen */ private Map<Node, Integer> pickNodesAtRandom(int numNodes, String excludedScope) { Map<Node, Integer> frequency = new HashMap<Node, Integer>(); for (DatanodeDescriptor dnd : dataNodes) { frequency.put(dnd, 0); } for (int j = 0; j < numNodes; j++) { Node random = cluster.chooseRandom(excludedScope); frequency.put(random, frequency.get(random) + 1); } return frequency; }
public static long getLiveDatanodeCapacity(DatanodeManager dm) { final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>(); dm.fetchDatanodes(live, null, false); long capacity = 0; for (final DatanodeDescriptor dn : live) { capacity += dn.getCapacity(); } return capacity; }
public static void waitForDatanodeStatus(DatanodeManager dm, int expectedLive, int expectedDead, long expectedVolFails, long expectedTotalCapacity, long timeout) throws InterruptedException, TimeoutException { final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>(); final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>(); final int ATTEMPTS = 10; int count = 0; long currTotalCapacity = 0; int volFails = 0; do { Thread.sleep(timeout); live.clear(); dead.clear(); dm.fetchDatanodes(live, dead, false); currTotalCapacity = 0; volFails = 0; for (final DatanodeDescriptor dd : live) { currTotalCapacity += dd.getCapacity(); volFails += dd.getVolumeFailures(); } count++; } while ((expectedLive != live.size() || expectedDead != dead.size() || expectedTotalCapacity != currTotalCapacity || expectedVolFails != volFails) && count < ATTEMPTS); if (count == ATTEMPTS) { throw new TimeoutException("Timed out waiting for capacity." + " Live = "+live.size()+" Expected = "+expectedLive + " Dead = "+dead.size()+" Expected = "+expectedDead + " Total capacity = "+currTotalCapacity + " Expected = "+expectedTotalCapacity + " Vol Fails = "+volFails+" Expected = "+expectedVolFails); } }
public static DatanodeStorageInfo createDatanodeStorageInfo( String storageID, String ip, String rack, String hostname, StorageType type) { final DatanodeStorage storage = new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, type); final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor( ip, rack, storage, hostname); return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage); }
public static DatanodeDescriptor[] toDatanodeDescriptor( DatanodeStorageInfo[] storages) { DatanodeDescriptor[] datanodes = new DatanodeDescriptor[storages.length]; for(int i = 0; i < datanodes.length; i++) { datanodes[i] = storages[i].getDatanodeDescriptor(); } return datanodes; }