void handleRegistrationError(RemoteException re, InetSocketAddress failedNode) { // If either the primary or standby NN throws these exceptions, this // datanode will exit. I think this is the right behaviour because // the excludes list on both namenode better be the same. String reClass = re.getClassName(); if (failedNode.equals(primaryAddr) && (UnregisteredDatanodeException.class.getName().equals(reClass) || DisallowedDatanodeException.class.getName().equals(reClass) || IncorrectVersionException.class.getName().equals(reClass)) ) { LOG.warn("Shut down this service: ", re); this.shouldServiceRun = false; } else { LOG.warn(re); } }
/** * The given node is reporting all its blocks. Use this info to * update the (machine-->blocklist) and (block-->machinelist) tables. */ public void processReport(DatanodeID nodeID, BlockListAsLongs newReport ) throws IOException { writeLock(); try { long startTime = now(); if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: " + "from " + nodeID.getName()+" " + newReport.getNumberOfBlocks()+" blocks"); } DatanodeDescriptor node = getDatanode(nodeID); if (node == null || !node.isAlive) { throw new IOException("ProcessReport from dead or unregisterted node: " + nodeID.getName()); } // Check if this datanode should actually be shutdown instead. if (shouldNodeShutdown(node)) { setDatanodeDead(node); throw new DisallowedDatanodeException(node); } blockManager.processReport(node, newReport); NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime)); } finally { writeUnlock(); } }
/** * The given node is reporting that it received a certain block. */ public void blockReceived(DatanodeID nodeID, Block block, String delHint ) throws IOException { writeLock(); try { DatanodeDescriptor node = getDatanode(nodeID); if (node == null || !node.isAlive) { NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block + " is received from dead or unregistered node " + nodeID.getName()); throw new IOException( "Got blockReceived message from unregistered or dead node " + block); } if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: " +block+" is received from " + nodeID.getName()); } // Check if this datanode should actually be shutdown instead. if (shouldNodeShutdown(node)) { setDatanodeDead(node); throw new DisallowedDatanodeException(node); } blockManager.addBlock(node, block, delHint); } finally { writeUnlock(); } }
void handleRegistrationError(RemoteException re) { // If either the primary or standby NN throws these exceptions, this // datanode will exit. I think this is the right behaviour because // the excludes list on both namenode better be the same. String reClass = re.getClassName(); if (UnregisteredDatanodeException.class.getName().equals(reClass) || DisallowedDatanodeException.class.getName().equals(reClass) || IncorrectVersionException.class.getName().equals(reClass)) { LOG.warn("DataNode is shutting down: ", re); shutdownDN(); } else { LOG.warn(re); } }
/** * The given node has reported in. This method should: * 1) Record the heartbeat, so the datanode isn't timed out * 2) Adjust usage stats for future block allocation * <p/> * If a substantial amount of time passed since the last datanode * heartbeat then request an immediate block report. * * @return an array of datanode commands * @throws IOException */ DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, long capacity, long dfsUsed, long remaining, long namespaceUsed, int xceiverCount, int xmitsInProgress) throws IOException { DatanodeCommand cmd = null; synchronized (heartbeats) { synchronized (datanodeMap) { DatanodeDescriptor nodeinfo = null; try { nodeinfo = getDatanode(nodeReg); } catch (UnregisteredDatanodeException e) { return new DatanodeCommand[]{DatanodeCommand.REGISTER}; } // Check if this datanode should actually be shutdown instead. if (nodeinfo != null && nodeinfo.isDisallowed()) { setDatanodeDead(nodeinfo); throw new DisallowedDatanodeException(nodeinfo); } if (nodeinfo == null || !nodeinfo.isAlive) { return new DatanodeCommand[]{DatanodeCommand.REGISTER}; } updateStats(nodeinfo, false); nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, namespaceUsed, xceiverCount); updateStats(nodeinfo, true); //check lease recovery cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE); if (cmd != null) { return new DatanodeCommand[]{cmd}; } ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(2); //check pending replication cmd = nodeinfo.getReplicationCommand(maxReplicationStreams - xmitsInProgress); if (cmd != null) { cmds.add(cmd); } //check block invalidation cmd = nodeinfo.getInvalidateBlocks(ReplicationConfigKeys.blockInvalidateLimit); if (cmd != null) { cmds.add(cmd); } // check raid tasks cmd = nodeinfo.getRaidCommand(ReplicationConfigKeys.raidEncodingTaskLimit, ReplicationConfigKeys.raidDecodingTaskLimit); if (cmd != null) { cmds.add(cmd); } if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } } } //check distributed upgrade cmd = getDistributedUpgradeCommand(); if (cmd != null) { return new DatanodeCommand[]{cmd}; } return null; }
/** * Handle heartbeat from datanodes. */ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, final String blockPoolId, long capacity, long dfsUsed, long remaining, long blockPoolUsed, int xceiverCount, int maxTransfers, int failedVolumes) throws IOException { synchronized (heartbeatManager) { synchronized (datanodeMap) { DatanodeDescriptor nodeinfo = null; try { nodeinfo = getDatanode(nodeReg); } catch (UnregisteredNodeException e) { return new DatanodeCommand[]{RegisterCommand.REGISTER}; } // Check if this datanode should actually be shutdown instead. if (nodeinfo != null && nodeinfo.isDisallowed()) { setDatanodeDead(nodeinfo); throw new DisallowedDatanodeException(nodeinfo); } if (nodeinfo == null || !nodeinfo.isAlive) { return new DatanodeCommand[]{RegisterCommand.REGISTER}; } heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed, remaining, blockPoolUsed, xceiverCount, failedVolumes); //check lease recovery BlockInfoUnderConstruction[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE); if (blocks != null) { BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.length); for (BlockInfoUnderConstruction b : blocks) { brCommand.add(new RecoveringBlock(new ExtendedBlock(blockPoolId, b), getDataNodeDescriptorsTx(b), b.getBlockRecoveryId())); } return new DatanodeCommand[]{brCommand}; } final List<DatanodeCommand> cmds = new ArrayList<>(); //check pending replication List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(maxTransfers); if (pendingList != null) { cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, pendingList)); } //check block invalidation Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); if (blks != null) { cmds.add( new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks)); } blockManager.addKeyUpdateCommand(cmds, nodeinfo); // check for balancer bandwidth update if (nodeinfo.getBalancerBandwidth() > 0) { cmds.add( new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth())); // set back to 0 to indicate that datanode has been sent the new value nodeinfo.setBalancerBandwidth(0); } if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } } } return new DatanodeCommand[0]; }
/** * The given node has reported in. This method should: * 1) Record the heartbeat, so the datanode isn't timed out * 2) Adjust usage stats for future block allocation * * If a substantial amount of time passed since the last datanode * heartbeat then request an immediate block report. * * @return an array of datanode commands * @throws IOException */ DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, long capacity, long dfsUsed, long remaining, int xceiverCount, int xmitsInProgress) throws IOException { DatanodeCommand cmd = null; synchronized (heartbeats) { synchronized (datanodeMap) { DatanodeDescriptor nodeinfo = null; try { nodeinfo = getDatanode(nodeReg); } catch(UnregisteredDatanodeException e) { return new DatanodeCommand[]{DatanodeCommand.REGISTER}; } // Check if this datanode should actually be shutdown instead. if (nodeinfo != null && shouldNodeShutdown(nodeinfo)) { setDatanodeDead(nodeinfo); throw new DisallowedDatanodeException(nodeinfo); } if (nodeinfo == null || !nodeinfo.isAlive) { return new DatanodeCommand[]{DatanodeCommand.REGISTER}; } updateStats(nodeinfo, false); nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount); updateStats(nodeinfo, true); //check lease recovery cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE); if (cmd != null) { return new DatanodeCommand[] {cmd}; } ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(); //check pending replication cmd = nodeinfo.getReplicationCommand( maxReplicationStreams - xmitsInProgress); if (cmd != null) { cmds.add(cmd); } //check block invalidation cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); if (cmd != null) { cmds.add(cmd); } // check access key update if (isAccessTokenEnabled && nodeinfo.needKeyUpdate) { cmds.add(new KeyUpdateCommand(accessTokenHandler.exportKeys())); nodeinfo.needKeyUpdate = false; } // check for balancer bandwidth update if (nodeinfo.getBalancerBandwidth() > 0) { cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth())); // set back to 0 to indicate that datanode has been sent the new value nodeinfo.setBalancerBandwidth(0); } if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } } } //check distributed upgrade cmd = getDistributedUpgradeCommand(); if (cmd != null) { return new DatanodeCommand[] {cmd}; } return null; }
/** * It will update the targets for INodeFileUnderConstruction * * @param nodeID * - DataNode ID * @param blocksBeingWritten * - list of blocks which are still inprogress. * @throws IOException */ public synchronized void processBlocksBeingWrittenReport(DatanodeID nodeID, BlockListAsLongs blocksBeingWritten) throws IOException { DatanodeDescriptor dataNode = getDatanode(nodeID); if (dataNode == null) { throw new IOException("ProcessReport from unregistered node: " + nodeID.getName()); } // Check if this datanode should actually be shutdown instead. if (shouldNodeShutdown(dataNode)) { setDatanodeDead(dataNode); throw new DisallowedDatanodeException(dataNode); } Block block = new Block(); for (int i = 0; i < blocksBeingWritten.getNumberOfBlocks(); i++) { block.set(blocksBeingWritten.getBlockId(i), blocksBeingWritten .getBlockLen(i), blocksBeingWritten.getBlockGenStamp(i)); BlockInfo storedBlock = blocksMap.getStoredBlockWithoutMatchingGS(block); if (storedBlock == null) { rejectAddStoredBlock(new Block(block), dataNode, "Block not in blockMap with any generation stamp"); continue; } INodeFile inode = storedBlock.getINode(); if (inode == null) { rejectAddStoredBlock(new Block(block), dataNode, "Block does not correspond to any file"); continue; } boolean underConstruction = inode.isUnderConstruction(); boolean isLastBlock = inode.getLastBlock() != null && inode.getLastBlock().getBlockId() == block.getBlockId(); // Must be the last block of a file under construction, if (!underConstruction) { rejectAddStoredBlock(new Block(block), dataNode, "Reported as block being written but is a block of closed file."); continue; } if (!isLastBlock) { rejectAddStoredBlock(new Block(block), dataNode, "Reported as block being written but not the last block of " + "an under-construction file."); continue; } INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) inode; pendingFile.addTarget(dataNode); incrementSafeBlockCount(pendingFile.getTargets().length); } }
/** * The given node is reporting that it received a certain block. */ public synchronized void blockReceived(DatanodeID nodeID, Block block, String delHint ) throws IOException { DatanodeDescriptor node = getDatanode(nodeID); if (node == null || !node.isAlive) { NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block + " is received from dead or unregistered node " + nodeID.getName()); throw new IOException( "Got blockReceived message from unregistered or dead node " + block); } if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: " +block+" is received from " + nodeID.getName()); } // Check if this datanode should actually be shutdown instead. if (shouldNodeShutdown(node)) { setDatanodeDead(node); throw new DisallowedDatanodeException(node); } // get the deletion hint node DatanodeDescriptor delHintNode = null; if(delHint!=null && delHint.length()!=0) { delHintNode = datanodeMap.get(delHint); if(delHintNode == null) { NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block + " is expected to be removed from an unrecorded node " + delHint); } } // // Modify the blocks->datanode map and node's map. // pendingReplications.remove(block); addStoredBlock(block, node, delHintNode ); // decrement number of blocks scheduled to this datanode. node.decBlocksScheduled(); }
/** * The given node has reported in. This method should: * 1) Record the heartbeat, so the datanode isn't timed out * 2) Adjust usage stats for future block allocation * <p/> * If a substantial amount of time passed since the last datanode * heartbeat then request an immediate block report. * * @return an array of datanode commands * @throws IOException */ DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, long capacity, long dfsUsed, long remaining, long namespaceUsed, int xceiverCount, int xmitsInProgress) throws IOException { DatanodeCommand cmd = null; synchronized (heartbeats) { synchronized (datanodeMap) { DatanodeDescriptor nodeinfo = null; try { nodeinfo = getDatanode(nodeReg); } catch (UnregisteredDatanodeException e) { return new DatanodeCommand[]{DatanodeCommand.REGISTER}; } // Check if this datanode should actually be shutdown instead. if (nodeinfo != null && nodeinfo.isDisallowed()) { setDatanodeDead(nodeinfo); throw new DisallowedDatanodeException(nodeinfo); } if (nodeinfo == null || !nodeinfo.isAlive) { return new DatanodeCommand[]{DatanodeCommand.REGISTER}; } updateStats(nodeinfo, false); nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, namespaceUsed, xceiverCount); updateStats(nodeinfo, true); //check lease recovery cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE); if (cmd != null) { return new DatanodeCommand[]{cmd}; } ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(2); //check pending replication cmd = nodeinfo.getReplicationCommand(maxReplicationStreams - xmitsInProgress); if (cmd != null) { cmds.add(cmd); } //check block invalidation cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); if (cmd != null) { cmds.add(cmd); } if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } } } //check distributed upgrade cmd = getDistributedUpgradeCommand(); if (cmd != null) { return new DatanodeCommand[]{cmd}; } return null; }
/** * The given node has reported in. This method should: * 1) Record the heartbeat, so the datanode isn't timed out * 2) Adjust usage stats for future block allocation * * If a substantial amount of time passed since the last datanode * heartbeat then request an immediate block report. * * @return an array of datanode commands * @throws IOException */ DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, long capacity, long dfsUsed, long remaining, int xceiverCount, int xmitsInProgress) throws IOException { DatanodeCommand cmd = null; synchronized (heartbeats) { synchronized (datanodeMap) { DatanodeDescriptor nodeinfo = null; try { nodeinfo = getDatanode(nodeReg); } catch(UnregisteredDatanodeException e) { return new DatanodeCommand[]{DatanodeCommand.REGISTER}; } // Check if this datanode should actually be shutdown instead. if (nodeinfo != null && shouldNodeShutdown(nodeinfo)) { setDatanodeDead(nodeinfo); throw new DisallowedDatanodeException(nodeinfo); } if (nodeinfo == null || !nodeinfo.isAlive) { return new DatanodeCommand[]{DatanodeCommand.REGISTER}; } updateStats(nodeinfo, false); nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount); updateStats(nodeinfo, true); //check lease recovery cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE); if (cmd != null) { return new DatanodeCommand[] {cmd}; } ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(2); //check pending replication cmd = nodeinfo.getReplicationCommand( maxReplicationStreams - xmitsInProgress); if (cmd != null) { cmds.add(cmd); } //check block invalidation cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); if (cmd != null) { cmds.add(cmd); } if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } } } //check distributed upgrade cmd = getDistributedUpgradeCommand(); if (cmd != null) { return new DatanodeCommand[] {cmd}; } return null; }
/** * The given node is reporting that it received a certain block. */ public synchronized void blockReceived(DatanodeID nodeID, Block block, String delHint ) throws IOException { DatanodeDescriptor node = getDatanode(nodeID); if (node == null) { NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block + " is received from an unrecorded node " + nodeID.getName()); throw new IllegalArgumentException( "Unexpected exception. Got blockReceived message from node " + block + ", but there is no info for it"); } if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: " +block+" is received from " + nodeID.getName()); } // Check if this datanode should actually be shutdown instead. if (shouldNodeShutdown(node)) { setDatanodeDead(node); throw new DisallowedDatanodeException(node); } // decrement number of blocks scheduled to this datanode. node.decBlocksScheduled(); // get the deletion hint node DatanodeDescriptor delHintNode = null; if(delHint!=null && delHint.length()!=0) { delHintNode = datanodeMap.get(delHint); if(delHintNode == null) { NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block + " is expected to be removed from an unrecorded node " + delHint); } } // // Modify the blocks->datanode map and node's map. // pendingReplications.remove(block); addStoredBlock(block, node, delHintNode ); }