private void waitForExcessReplicasToChange( FSNamesystem namesystem, Block block, int newReplicas) throws Exception { NumberReplicas num; long startChecking = System.currentTimeMillis(); do {"Waiting for a replica to become excess"); namesystem.readLock(); try { num = namesystem.countNodes(block); } finally { namesystem.readUnlock(); } Thread.sleep(100); if (System.currentTimeMillis() - startChecking > 30000) { fail("Timed out waiting for excess replicas to change"); } } while (num.excessReplicas() != newReplicas); }
private void waitForExcessReplicasToBeDeleted(FSNamesystem namesystem, Block block, DataNode dn) throws Exception { NumberReplicas num; long startChecking = System.currentTimeMillis(); do {"Waiting for the excess replica to be deleted"); dn.scheduleNSBlockReceivedAndDeleted(0); namesystem.readLock(); try { num = namesystem.countNodes(block); } finally { namesystem.readUnlock(); } Thread.sleep(100); if (System.currentTimeMillis() - startChecking > 30000) { fail("Timed out waiting for excess replicas to be deleted"); } } while (num.excessReplicas() != 0); }
private void waitForExcessReplicasToChange( FSNamesystem namesystem, Block block, int newReplicas) throws Exception { NumberReplicas num; long startChecking = System.currentTimeMillis(); do { namesystem.readLock(); try { num = namesystem.countNodes(block);"We have " + num.excessReplicas() + " excess replica"); } finally { namesystem.readUnlock(); } Thread.sleep(100); if (System.currentTimeMillis() - startChecking > 30000) { namesystem.metaSave("TestNodeCount.meta"); LOG.warn("Dumping meta into log directory"); fail("Timed out waiting for excess replicas to change"); } } while (num.excessReplicas() != newReplicas); }
/** * If there were any replication requests that timed out, reap them * and put them back into the neededReplication queue */ void processPendingReplications() { Block[] timedOutItems = pendingReplications.getTimedOutBlocks(); if (timedOutItems != null) { namesystem.writeLock(); try { for (int i = 0; i < timedOutItems.length; i++) { NumberReplicas num = countNodes(timedOutItems[i]); if (isNeededReplication(timedOutItems[i], getReplication(timedOutItems[i]), num.liveReplicas())) { neededReplications.add(timedOutItems[i], num.liveReplicas(), num.decommissionedReplicas(), getReplication(timedOutItems[i])); } } } finally { namesystem.writeUnlock(); } /* If we know the target datanodes where the replication timedout, * we could invoke decBlocksScheduled() on it. Its ok for now. */ } }
/** * Return the number of nodes that are live and decommissioned. */ NumberReplicas countNodes(Block b) { int count = 0; int live = 0; int corrupt = 0; int excess = 0; Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b); Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b); while (nodeIter.hasNext()) { DatanodeDescriptor node =; if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) { corrupt++; } else if (node.isDecommissionInProgress() || node.isDecommissioned()) { count++; } else { Collection<Block> blocksExcess = excessReplicateMap.get(node.getStorageID()); if (blocksExcess != null && blocksExcess.contains(b)) { excess++; } else { live++; } } } return new NumberReplicas(live, count, corrupt, excess); }
private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode, NumberReplicas num) { int curReplicas = num.liveReplicas(); int curExpectedReplicas = getReplication(block); INode fileINode = blocksMap.getINode(block); Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(block); StringBuilder nodeList = new StringBuilder(); while (nodeIter.hasNext()) { DatanodeDescriptor node =; nodeList.append(; nodeList.append(" "); }"Block: " + block + ", Expected Replicas: " + curExpectedReplicas + ", live replicas: " + curReplicas + ", corrupt replicas: " + num.corruptReplicas() + ", decommissioned replicas: " + num.decommissionedReplicas() + ", excess replicas: " + num.excessReplicas() + ", Is Open File: " + fileINode.isUnderConstruction() + ", Datanodes having this block: " + nodeList + ", Current Datanode: " + + ", Is current datanode decommissioning: " + srcNode.isDecommissionInProgress()); }
void updateNeededReplications(Block block, int curReplicasDelta, int expectedReplicasDelta) { namesystem.writeLock(); try { NumberReplicas repl = countNodes(block); int curExpectedReplicas = getReplication(block); if (isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) { neededReplications.update(block, repl.liveReplicas(), repl .decommissionedReplicas(), curExpectedReplicas, curReplicasDelta, expectedReplicasDelta); } else { int oldReplicas = repl.liveReplicas()-curReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; neededReplications.remove(block, oldReplicas, repl.decommissionedReplicas(), oldExpectedReplicas); } } finally { namesystem.writeUnlock(); } }
private void waitForExcessReplicasToChange( FSNamesystem namesystem, Block block, int newReplicas) throws Exception { NumberReplicas num; long startChecking = System.currentTimeMillis(); do { namesystem.readLock(); try { num = namesystem.countNodes(block); } finally { namesystem.readUnlock(); } Thread.sleep(100); if (System.currentTimeMillis() - startChecking > 30000) { fail("Timed out waiting for excess replicas to change"); } } while (num.excessReplicas() != newReplicas); }
/** * For each block in the name-node verify whether it belongs to any file, * over or under replicated. Place it into the respective queue. */ void processMisReplicatedBlocks() { long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0; namesystem.writeLock(); try { neededReplications.clear(); for (BlockInfo block : blocksMap.getBlocks()) { INodeFile fileINode = block.getINode(); if (fileINode == null) { // block does not belong to any file nrInvalid++; addToInvalidates(block); continue; } // calculate current replication short expectedReplication = fileINode.getReplication(); NumberReplicas num = countNodes(block); int numCurrentReplica = num.liveReplicas(); // add to under-replicated queue if need to be if (isNeededReplication(block, expectedReplication, numCurrentReplica)) { if (neededReplications.add(block, numCurrentReplica, num .decommissionedReplicas(), expectedReplication)) { nrUnderReplicated++; } } if (numCurrentReplica > expectedReplication) { // over-replicated block nrOverReplicated++; processOverReplicatedBlock(block, expectedReplication, null, null); } } } finally { namesystem.writeUnlock(); }"Total number of blocks = " + blocksMap.size());"Number of invalid blocks = " + nrInvalid);"Number of under-replicated blocks = " + nrUnderReplicated);"Number of over-replicated blocks = " + nrOverReplicated); }
void checkReplication(Block block, int numExpectedReplicas) { // filter out containingNodes that are marked for decommission. NumberReplicas number = countNodes(block); if (isNeededReplication(block, numExpectedReplicas, number.liveReplicas())) { neededReplications.add(block, number.liveReplicas(), number.decommissionedReplicas, numExpectedReplicas); } }
public void testUnderReplicationWithDecommissionDataNode() throws Exception { final Configuration conf = new Configuration(); final short REPLICATION_FACTOR = (short)1; File f = new File(HOST_FILE_PATH); if (f.exists()) { f.delete(); } f.createNewFile(); conf.set("dfs.hosts.exclude", HOST_FILE_PATH);"Start the cluster"); final MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION_FACTOR, true, null); try { final FSNamesystem namesystem = cluster.getNameNode().namesystem; final FileSystem fs = cluster.getFileSystem(); DatanodeDescriptor[] datanodes = (DatanodeDescriptor[]) namesystem.heartbeats.toArray( new DatanodeDescriptor[REPLICATION_FACTOR]); assertEquals(1, datanodes.length); // populate the cluster with a one block file final Path FILE_PATH = new Path("/testfile2"); DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L); DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR); Block block = DFSTestUtil.getFirstBlock(fs, FILE_PATH); // shutdown the datanode DataNodeProperties dnprop = shutdownDataNode(cluster, datanodes[0]); assertEquals(1, namesystem.getMissingBlocksCount()); // one missing block assertEquals(0, namesystem.getNonCorruptUnderReplicatedBlocks()); // Make the only datanode to be decommissioned"Decommission the datanode " + dnprop); addToExcludeFile(namesystem.getConf(), datanodes); namesystem.refreshNodes(namesystem.getConf()); // bring up the datanode cluster.restartDataNode(dnprop); // Wait for block report"wait for its block report to come in"); NumberReplicas num; long startTime = System.currentTimeMillis(); do { namesystem.readLock(); try { num = namesystem.countNodes(block); } finally { namesystem.readUnlock(); } Thread.sleep(1000);"live: " + num.liveReplicas() + "Decom: " + num.decommissionedReplicas()); } while (num.decommissionedReplicas() != 1 && System.currentTimeMillis() - startTime < 30000); assertEquals("Decommissioning Replicas doesn't reach 1", 1, num.decommissionedReplicas()); assertEquals(1, namesystem.getNonCorruptUnderReplicatedBlocks()); assertEquals(0, namesystem.getMissingBlocksCount()); } finally { cluster.shutdown(); } }
/** * Take down a datanode to generate raid missing blocks, and then bring it back * will restore the missing blocks. */ @Test public void testRaidMissingBlocksByTakingDownDataNode() throws IOException, InterruptedException { MiniDFSCluster cluster = null; Configuration conf = new Configuration(); try { cluster = new MiniDFSCluster(conf, 1, true, null); final FSNamesystem namesystem = cluster.getNameNode().namesystem; final DistributedFileSystem dfs = DFSUtil.convertToDFS(cluster.getFileSystem()); String filePath = "/test/file1"; RaidCodec rsCodec = RaidCodec.getCodec("rs"); RaidDFSUtil.constructFakeRaidFile(dfs, filePath, rsCodec); DatanodeDescriptor[] datanodes = (DatanodeDescriptor[]) namesystem.heartbeats.toArray( new DatanodeDescriptor[1]); assertEquals(1, datanodes.length); // shutdown the datanode DataNodeProperties dnprop = shutdownDataNode(cluster, datanodes[0]); assertEquals(rsCodec.numStripeBlocks, namesystem.getRaidMissingBlocksCount()); assertEquals(0, namesystem.getMissingBlocksCount()); // zero non-raid missing block assertEquals(0, namesystem.getNonCorruptUnderReplicatedBlocks()); // bring up the datanode cluster.restartDataNode(dnprop); // Wait for block report"wait for its block report to come in"); NumberReplicas num; FileStatus stat = dfs.getFileStatus(new Path(filePath)); LocatedBlocks blocks = dfs.getClient(). getLocatedBlocks(filePath, 0, stat.getLen()); long startTime = System.currentTimeMillis(); do { Thread.sleep(1000); int totalCount = 0; namesystem.readLock(); try { for (LocatedBlock block : blocks.getLocatedBlocks()) { num = namesystem.countNodes(block.getBlock()); totalCount += num.liveReplicas(); } if (totalCount == rsCodec.numDataBlocks) { break; } else {"wait for block report, received total replicas: " + totalCount); } } finally { namesystem.readUnlock(); } } while (System.currentTimeMillis() - startTime < 30000); assertEquals(0, namesystem.getRaidMissingBlocksCount()); assertEquals(0, namesystem.getMissingBlocksCount()); // zero non-raid missing block assertEquals(0, namesystem.getNonCorruptUnderReplicatedBlocks()); } finally { if (cluster != null) { cluster.shutdown(); } } }
public void testInvalidateMultipleReplicas() throws Exception { Configuration conf = new Configuration(); MiniDFSCluster cluster = new MiniDFSCluster(conf, 5, true, null); final int FILE_LEN = 123; final String pathStr = "/testInvalidateMultipleReplicas"; try { FileSystem fs = cluster.getFileSystem(); Path path = new Path(pathStr); cluster.waitActive(); // create a small file on 3 nodes DFSTestUtil.createFile(fs, path, 123, (short)3, 0); DFSTestUtil.waitReplication(fs, path, (short)3); NameNode nn = cluster.getNameNode(); LocatedBlocks located = nn.getBlockLocations(pathStr, 0, FILE_LEN); // Get the original block locations List<LocatedBlock> blocks = located.getLocatedBlocks(); LocatedBlock firstBlock = blocks.get(0); DatanodeInfo[] locations = firstBlock.getLocations(); assertEquals("Should have 3 good blocks", 3, locations.length); nn.getNamesystem().stallReplicationWork(); DatanodeInfo[] badLocations = new DatanodeInfo[2]; badLocations[0] = locations[0]; badLocations[1] = locations[1]; // Report some blocks corrupt LocatedBlock badLBlock = new LocatedBlock( firstBlock.getBlock(), badLocations); nn.reportBadBlocks(new LocatedBlock[] {badLBlock}); nn.getNamesystem().restartReplicationWork(); DFSTestUtil.waitReplication(fs, path, (short)3); NumberReplicas num = nn.getNamesystem().countNodes( firstBlock.getBlock()); assertEquals(0, num.corruptReplicas()); } finally { cluster.shutdown(); } }
void metaSave(PrintWriter out) { // // Dump contents of neededReplication // synchronized (neededReplications) { out.println("Metasave: Blocks waiting for replication: " + neededReplications.size()); for (Block block : neededReplications) { List<DatanodeDescriptor> containingNodes = new ArrayList<DatanodeDescriptor>(); NumberReplicas numReplicas = new NumberReplicas(); // source node returned is not used chooseSourceDatanode(block, containingNodes, numReplicas); int usableReplicas = numReplicas.liveReplicas() + numReplicas.decommissionedReplicas(); if (block instanceof BlockInfo) { String fileName = ((BlockInfo)block).getINode().getFullPathName(); out.print(fileName + ": "); } // l: == live:, d: == decommissioned c: == corrupt e: == excess out.print(block + ((usableReplicas > 0)? "" : " MISSING") + " (replicas:" + " l: " + numReplicas.liveReplicas() + " d: " + numReplicas.decommissionedReplicas() + " c: " + numReplicas.corruptReplicas() + " e: " + numReplicas.excessReplicas() + ") "); Collection<DatanodeDescriptor> corruptNodes = corruptReplicas.getNodes(block); for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block); jt.hasNext();) { DatanodeDescriptor node =; String state = ""; if (corruptNodes != null && corruptNodes.contains(node)) { state = "(corrupt)"; } else if (node.isDecommissioned() || node.isDecommissionInProgress()) { state = "(decommissioned)"; } out.print(" " + node + state + " : "); } out.println(""); } } // // Dump blocks from pendingReplication // pendingReplications.metaSave(out); // // Dump blocks that are waiting to be deleted // dumpRecentInvalidateSets(out); }
/** * Parse the data-nodes the block belongs to and choose one, * which will be the replication source. * * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes * since the former do not have write traffic and hence are less busy. * We do not use already decommissioned nodes as a source. * Otherwise we choose a random node among those that did not reach their * replication limit. * * In addition form a list of all nodes containing the block * and calculate its replication numbers. */ private DatanodeDescriptor chooseSourceDatanode( Block block, List<DatanodeDescriptor> containingNodes, NumberReplicas numReplicas) { containingNodes.clear(); DatanodeDescriptor srcNode = null; int live = 0; int decommissioned = 0; int corrupt = 0; int excess = 0; Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block); Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block); while(it.hasNext()) { DatanodeDescriptor node =; Collection<Block> excessBlocks = excessReplicateMap.get(node.getStorageID()); if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) corrupt++; else if (node.isDecommissionInProgress() || node.isDecommissioned()) decommissioned++; else if (excessBlocks != null && excessBlocks.contains(block)) { excess++; } else { live++; } containingNodes.add(node); // Check if this replica is corrupt // If so, do not select the node as src node if ((nodesCorrupt != null) && nodesCorrupt.contains(node)) continue; if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) continue; // already reached replication limit // the block must not be scheduled for removal on srcNode if(excessBlocks != null && excessBlocks.contains(block)) continue; // never use already decommissioned nodes if(node.isDecommissioned()) continue; // we prefer nodes that are in DECOMMISSION_INPROGRESS state if(node.isDecommissionInProgress() || srcNode == null) { srcNode = node; continue; } if(srcNode.isDecommissionInProgress()) continue; // switch to a different node randomly // this to prevent from deterministically selecting the same node even // if the node failed to replicate the block on previous iterations if(r.nextBoolean()) srcNode = node; } if(numReplicas != null) numReplicas.initialize(live, decommissioned, corrupt, excess); return srcNode; }
/** * Return true if there are any blocks on this node that have not * yet reached their replication factor. Otherwise returns false. */ boolean isReplicationInProgress(DatanodeDescriptor srcNode) { boolean status = false; int underReplicatedBlocks = 0; int decommissionOnlyReplicas = 0; int underReplicatedInOpenFiles = 0; final Iterator<? extends Block> it = srcNode.getBlockIterator(); while(it.hasNext()) { final Block block =; INode fileINode = blocksMap.getINode(block); if (fileINode != null) { NumberReplicas num = countNodes(block); int curReplicas = num.liveReplicas(); int curExpectedReplicas = getReplication(block); if (isNeededReplication(block, curExpectedReplicas, curReplicas)) { if (curExpectedReplicas > curReplicas) { //Log info about one block for this node which needs replication if (!status) { status = true; logBlockReplicationInfo(block, srcNode, num); } underReplicatedBlocks++; if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) { decommissionOnlyReplicas++; } if (fileINode.isUnderConstruction()) { underReplicatedInOpenFiles++; } } if (!neededReplications.contains(block) && pendingReplications.getNumReplicas(block) == 0) { // // These blocks have been reported from the datanode // after the startDecommission method has been executed. These // blocks were in flight when the decommissioning was started. // neededReplications.add(block, curReplicas, num.decommissionedReplicas(), curExpectedReplicas); } } } } srcNode.decommissioningStatus.set(underReplicatedBlocks, decommissionOnlyReplicas, underReplicatedInOpenFiles); return status; }
public void testUnderReplicationWithDecommissionDataNode() throws Exception { final Configuration conf = new Configuration(); final short REPLICATION_FACTOR = (short)1; File f = new File(HOST_FILE_PATH); if (f.exists()) { f.delete(); } conf.set("dfs.hosts.exclude", HOST_FILE_PATH);"Start the cluster"); final MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION_FACTOR, true, null); try { final FSNamesystem namesystem = cluster.getNameNode().namesystem; final FileSystem fs = cluster.getFileSystem(); DatanodeDescriptor[] datanodes = (DatanodeDescriptor[]) namesystem.heartbeats.toArray( new DatanodeDescriptor[REPLICATION_FACTOR]); assertEquals(1, datanodes.length); // populate the cluster with a one block file final Path FILE_PATH = new Path("/testfile2"); DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L); DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR); Block block = DFSTestUtil.getFirstBlock(fs, FILE_PATH); // shutdown the datanode DataNodeProperties dnprop = shutdownDataNode(cluster, datanodes[0]); assertEquals(1, namesystem.getMissingBlocksCount()); // one missing block assertEquals(0, namesystem.getNonCorruptUnderReplicatedBlocks()); // Make the only datanode to be decommissioned"Decommission the datanode " + dnprop); addToExcludeFile(namesystem.getConf(), datanodes); namesystem.refreshNodes(namesystem.getConf()); // bring up the datanode cluster.restartDataNode(dnprop); // Wait for block report"wait for its block report to come in"); NumberReplicas num; long startTime = System.currentTimeMillis(); do { namesystem.readLock(); try { num = namesystem.countNodes(block); } finally { namesystem.readUnlock(); } Thread.sleep(1000);"live: " + num.liveReplicas() + "Decom: " + num.decommissionedReplicas()); } while (num.decommissionedReplicas() != 1 && System.currentTimeMillis() - startTime < 30000); assertEquals("Decommissioning Replicas doesn't reach 1", 1, num.decommissionedReplicas()); assertEquals(1, namesystem.getNonCorruptUnderReplicatedBlocks()); assertEquals(0, namesystem.getMissingBlocksCount()); } finally { cluster.shutdown(); } }