/** * Periodically go over the list of lazyPersist files with missing * blocks and unlink them from the namespace. */ private void clearCorruptLazyPersistFiles() throws IOException { BlockStoragePolicy lpPolicy = blockManager.getStoragePolicy("LAZY_PERSIST"); List<BlockCollection> filesToDelete = new ArrayList<>(); boolean changed = false; writeLock(); try { final Iterator<Block> it = blockManager.getCorruptReplicaBlockIterator(); while (it.hasNext()) { Block b = it.next(); BlockInfoContiguous blockInfo = blockManager.getStoredBlock(b); if (blockInfo.getBlockCollection().getStoragePolicyID() == lpPolicy.getId()) { filesToDelete.add(blockInfo.getBlockCollection()); } } for (BlockCollection bc : filesToDelete) { LOG.warn("Removing lazyPersist file " + bc.getName() + " with no replicas."); BlocksMapUpdateInfo toRemoveBlocks = FSDirDeleteOp.deleteInternal( FSNamesystem.this, bc.getName(), INodesInPath.fromINode((INodeFile) bc), false); changed |= toRemoveBlocks != null; if (toRemoveBlocks != null) { removeBlocks(toRemoveBlocks); // Incremental deletion of blocks } } } finally { writeUnlock(); } if (changed) { getEditLog().logSync(); } }
@Override public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection inode, Block block, short replicationFactor, Collection<DatanodeStorageInfo> first, Collection<DatanodeStorageInfo> second, List<StorageType> excessTypes) { Collection<DatanodeStorageInfo> chooseFrom = !first.isEmpty() ? first : second; List<DatanodeStorageInfo> l = Lists.newArrayList(chooseFrom); return l.get(DFSUtil.getRandom().nextInt(l.size())); }
/** * Periodically go over the list of lazyPersist files with missing * blocks and unlink them from the namespace. */ private void clearCorruptLazyPersistFiles() throws SafeModeException, AccessControlException, UnresolvedLinkException, IOException { BlockStoragePolicy lpPolicy = blockManager.getStoragePolicy("LAZY_PERSIST"); List<BlockCollection> filesToDelete = new ArrayList<BlockCollection>(); writeLock(); try { final Iterator<Block> it = blockManager.getCorruptReplicaBlockIterator(); while (it.hasNext()) { Block b = it.next(); BlockInfo blockInfo = blockManager.getStoredBlock(b); if (blockInfo.getBlockCollection().getStoragePolicyID() == lpPolicy.getId()) { filesToDelete.add(blockInfo.getBlockCollection()); } } for (BlockCollection bc : filesToDelete) { LOG.warn("Removing lazyPersist file " + bc.getName() + " with no replicas."); deleteInternal(bc.getName(), false, false, false); } } finally { writeUnlock(); } }
@Override public DatanodeDescriptor chooseReplicaToDelete(BlockCollection inode, Block block, short replicationFactor, Collection<DatanodeDescriptor> first, Collection<DatanodeDescriptor> second) { Collection<DatanodeDescriptor> chooseFrom = !first.isEmpty() ? first : second; List<DatanodeDescriptor> l = Lists.newArrayList(chooseFrom); return l.get(DFSUtil.getRandom().nextInt(l.size())); }
/** * Check block information given a blockId number * */ public void blockIdCK(String blockId) { if(blockId == null) { out.println("Please provide valid blockId!"); return; } BlockManager bm = namenode.getNamesystem().getBlockManager(); try { //get blockInfo Block block = new Block(Block.getBlockId(blockId)); //find which file this block belongs to BlockInfoContiguous blockInfo = bm.getStoredBlock(block); if(blockInfo == null) { out.println("Block "+ blockId +" " + NONEXISTENT_STATUS); LOG.warn("Block "+ blockId + " " + NONEXISTENT_STATUS); return; } BlockCollection bc = bm.getBlockCollection(blockInfo); INode iNode = (INode) bc; NumberReplicas numberReplicas= bm.countNodes(block); out.println("Block Id: " + blockId); out.println("Block belongs to: "+iNode.getFullPathName()); out.println("No. of Expected Replica: " + bc.getBlockReplication()); out.println("No. of live Replica: " + numberReplicas.liveReplicas()); out.println("No. of excess Replica: " + numberReplicas.excessReplicas()); out.println("No. of stale Replica: " + numberReplicas.replicasOnStaleNodes()); out.println("No. of decommission Replica: " + numberReplicas.decommissionedReplicas()); out.println("No. of corrupted Replica: " + numberReplicas.corruptReplicas()); //record datanodes that have corrupted block replica Collection<DatanodeDescriptor> corruptionRecord = null; if (bm.getCorruptReplicas(block) != null) { corruptionRecord = bm.getCorruptReplicas(block); } //report block replicas status on datanodes for(int idx = (blockInfo.numNodes()-1); idx >= 0; idx--) { DatanodeDescriptor dn = blockInfo.getDatanode(idx); out.print("Block replica on datanode/rack: " + dn.getHostName() + dn.getNetworkLocation() + " "); if (corruptionRecord != null && corruptionRecord.contains(dn)) { out.print(CORRUPT_STATUS+"\t ReasonCode: "+ bm.getCorruptReason(block,dn)); } else if (dn.isDecommissioned() ){ out.print(DECOMMISSIONED_STATUS); } else if (dn.isDecommissionInProgress()) { out.print(DECOMMISSIONING_STATUS); } else { out.print(HEALTHY_STATUS); } out.print("\n"); } } catch (Exception e){ String errMsg = "Fsck on blockId '" + blockId; LOG.warn(errMsg, e); out.println(e.getMessage()); out.print("\n\n" + errMsg); LOG.warn("Error in looking up block", e); } }
@Test(timeout = 60000) public void testFsckReplicaDetails() throws Exception { final short REPL_FACTOR = 1; short NUM_DN = 1; final long blockSize = 512; final long fileSize = 1024; boolean checkDecommissionInProgress = false; String[] racks = { "/rack1" }; String[] hosts = { "host1" }; Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); MiniDFSCluster cluster; DistributedFileSystem dfs; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts).racks(racks).build(); cluster.waitClusterUp(); dfs = cluster.getFileSystem(); // create files final String testFile = new String("/testfile"); final Path path = new Path(testFile); DFSTestUtil.createFile(dfs, path, fileSize, REPL_FACTOR, 1000L); DFSTestUtil.waitReplication(dfs, path, REPL_FACTOR); try { // make sure datanode that has replica is fine before decommission String fsckOut = runFsck(conf, 0, true, testFile, "-files", "-blocks", "-replicaDetails"); assertTrue(fsckOut.contains(NamenodeFsck.HEALTHY_STATUS)); assertTrue(fsckOut.contains("(LIVE)")); // decommission datanode ExtendedBlock eb = DFSTestUtil.getFirstBlock(dfs, path); FSNamesystem fsn = cluster.getNameNode().getNamesystem(); BlockManager bm = fsn.getBlockManager(); BlockCollection bc = null; try { fsn.writeLock(); BlockInfo bi = bm.getStoredBlock(eb.getLocalBlock()); bc = fsn.getBlockCollection(bi); } finally { fsn.writeUnlock(); } DatanodeDescriptor dn = bc.getBlocks()[0] .getDatanode(0); bm.getDatanodeManager().getDecomManager().startDecommission(dn); String dnName = dn.getXferAddr(); // check the replica status while decommissioning fsckOut = runFsck(conf, 0, true, testFile, "-files", "-blocks", "-replicaDetails"); assertTrue(fsckOut.contains("(DECOMMISSIONING)")); // Start 2nd Datanode and wait for decommission to start cluster.startDataNodes(conf, 1, true, null, null, null); DatanodeInfo datanodeInfo = null; do { Thread.sleep(2000); for (DatanodeInfo info : dfs.getDataNodeStats()) { if (dnName.equals(info.getXferAddr())) { datanodeInfo = info; } } if (!checkDecommissionInProgress && datanodeInfo != null && datanodeInfo.isDecommissionInProgress()) { checkDecommissionInProgress = true; } } while (datanodeInfo != null && !datanodeInfo.isDecommissioned()); // check the replica status after decommission is done fsckOut = runFsck(conf, 0, true, testFile, "-files", "-blocks", "-replicaDetails"); assertTrue(fsckOut.contains("(DECOMMISSIONED)")); } finally { if (cluster != null) { cluster.shutdown(); } } }
BlockCollection getBlockCollection(long id);