/** * Create BlockCommand for transferring blocks to another datanode * @param blocktargetlist blocks to be transferred */ public BlockCommand(int action, String poolId, List<BlockTargetPair> blocktargetlist) { super(action); this.poolId = poolId; blocks = new Block[blocktargetlist.size()]; targets = new DatanodeInfo[blocks.length][]; targetStorageTypes = new StorageType[blocks.length][]; targetStorageIDs = new String[blocks.length][]; for(int i = 0; i < blocks.length; i++) { BlockTargetPair p = blocktargetlist.get(i); blocks[i] = p.block; targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets); targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets); targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets); } }
/** * Create BlockCommand for transferring blocks to another datanode * @param blocktargetlist blocks to be transferred */ public BlockCommand(int action, String poolId, List<BlockTargetPair> blocktargetlist) { super(action); this.poolId = poolId; blocks = new Block[blocktargetlist.size()]; targets = new DatanodeInfo[blocks.length][]; targetStorageIDs = new String[blocks.length][]; for(int i = 0; i < blocks.length; i++) { BlockTargetPair p = blocktargetlist.get(i); blocks[i] = p.block; targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets); targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets); } }
private DatanodeStorageInfo[] scheduleSingleReplication(Block block) { // list for priority 1 List<Block> list_p1 = new ArrayList<Block>(); list_p1.add(block); // list of lists for each priority List<List<Block>> list_all = new ArrayList<List<Block>>(); list_all.add(new ArrayList<Block>()); // for priority 0 list_all.add(list_p1); // for priority 1 assertEquals("Block not initially pending replication", 0, bm.pendingReplications.getNumReplicas(block)); assertEquals( "computeReplicationWork should indicate replication is needed", 1, bm.computeReplicationWorkForBlocks(list_all)); assertTrue("replication is pending after work is computed", bm.pendingReplications.getNumReplicas(block) > 0); LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls = getAllPendingReplications(); assertEquals(1, repls.size()); Entry<DatanodeStorageInfo, BlockTargetPair> repl = repls.entries().iterator().next(); DatanodeStorageInfo[] targets = repl.getValue().targets; DatanodeStorageInfo[] pipeline = new DatanodeStorageInfo[1 + targets.length]; pipeline[0] = repl.getKey(); System.arraycopy(targets, 0, pipeline, 1, targets.length); return pipeline; }
private LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> getAllPendingReplications() { LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls = LinkedListMultimap.create(); for (DatanodeDescriptor dn : nodes) { List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10); if (thisRepls != null) { for(DatanodeStorageInfo storage : dn.getStorageInfos()) { repls.putAll(storage, thisRepls); } } } return repls; }
private DatanodeStorageInfo[] scheduleSingleReplication(BlockInfo block) { // list for priority 1 List<BlockInfo> list_p1 = new ArrayList<>(); list_p1.add(block); // list of lists for each priority List<List<BlockInfo>> list_all = new ArrayList<>(); list_all.add(new ArrayList<BlockInfo>()); // for priority 0 list_all.add(list_p1); // for priority 1 assertEquals("Block not initially pending replication", 0, bm.pendingReplications.getNumReplicas(block)); assertEquals( "computeBlockRecoveryWork should indicate replication is needed", 1, bm.computeRecoveryWorkForBlocks(list_all)); assertTrue("replication is pending after work is computed", bm.pendingReplications.getNumReplicas(block) > 0); LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls = getAllPendingReplications(); assertEquals(1, repls.size()); Entry<DatanodeStorageInfo, BlockTargetPair> repl = repls.entries().iterator().next(); DatanodeStorageInfo[] targets = repl.getValue().targets; DatanodeStorageInfo[] pipeline = new DatanodeStorageInfo[1 + targets.length]; pipeline[0] = repl.getKey(); System.arraycopy(targets, 0, pipeline, 1, targets.length); return pipeline; }
/** * Create BlockCommand for transferring blocks to another datanode * @param blocktargetlist blocks to be transferred */ public BlockCommand(int action, String poolId, List<BlockTargetPair> blocktargetlist) { super(action); this.poolId = poolId; blocks = new Block[blocktargetlist.size()]; targets = new DatanodeInfo[blocks.length][]; for(int i = 0; i < blocks.length; i++) { BlockTargetPair p = blocktargetlist.get(i); blocks[i] = p.block; targets[i] = p.targets; } }
private DatanodeDescriptor[] scheduleSingleReplication(Block block) { // list for priority 1 List<Block> list_p1 = new ArrayList<Block>(); list_p1.add(block); // list of lists for each priority List<List<Block>> list_all = new ArrayList<List<Block>>(); list_all.add(new ArrayList<Block>()); // for priority 0 list_all.add(list_p1); // for priority 1 assertEquals("Block not initially pending replication", 0, bm.pendingReplications.getNumReplicas(block)); assertEquals( "computeReplicationWork should indicate replication is needed", 1, bm.computeReplicationWorkForBlocks(list_all)); assertTrue("replication is pending after work is computed", bm.pendingReplications.getNumReplicas(block) > 0); LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls = getAllPendingReplications(); assertEquals(1, repls.size()); Entry<DatanodeDescriptor, BlockTargetPair> repl = repls.entries().iterator().next(); DatanodeDescriptor[] targets = repl.getValue().targets; DatanodeDescriptor[] pipeline = new DatanodeDescriptor[1 + targets.length]; pipeline[0] = repl.getKey(); System.arraycopy(targets, 0, pipeline, 1, targets.length); return pipeline; }
private LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> getAllPendingReplications() { LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls = LinkedListMultimap.create(); for (DatanodeDescriptor dn : nodes) { List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10); if (thisRepls != null) { repls.putAll(dn, thisRepls); } } return repls; }
/** * Create BlockCommand for transferring blocks to another datanode * * @param blocktargetlist * blocks to be transferred */ public BlockCommand(int action, String poolId, List<BlockTargetPair> blocktargetlist) { super(action); this.poolId = poolId; blocks = new Block[blocktargetlist.size()]; targets = new DatanodeInfo[blocks.length][]; for (int i = 0; i < blocks.length; i++) { BlockTargetPair p = blocktargetlist.get(i); blocks[i] = p.block; targets[i] = p.targets; } }
/** Handle heartbeat from datanodes. */ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, StorageReport[] reports, final String blockPoolId, long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers, int failedVolumes, VolumeFailureSummary volumeFailureSummary) throws IOException { final DatanodeDescriptor nodeinfo; try { nodeinfo = getDatanode(nodeReg); } catch (UnregisteredNodeException e) { return new DatanodeCommand[]{RegisterCommand.REGISTER}; } // Check if this datanode should actually be shutdown instead. if (nodeinfo != null && nodeinfo.isDisallowed()) { setDatanodeDead(nodeinfo); throw new DisallowedDatanodeException(nodeinfo); } if (nodeinfo == null || !nodeinfo.isRegistered()) { return new DatanodeCommand[]{RegisterCommand.REGISTER}; } heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity, cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary); // If we are in safemode, do not send back any recovery / replication // requests. Don't even drain the existing queue of work. if (namesystem.isInSafeMode()) { return new DatanodeCommand[0]; } // block recovery command final BlockRecoveryCommand brCommand = getBlockRecoveryCommand(blockPoolId, nodeinfo); if (brCommand != null) { return new DatanodeCommand[]{brCommand}; } final List<DatanodeCommand> cmds = new ArrayList<>(); // check pending replication List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand( maxTransfers); if (pendingList != null) { cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, pendingList)); } // check pending erasure coding tasks List<BlockECRecoveryInfo> pendingECList = nodeinfo.getErasureCodeCommand( maxTransfers); if (pendingECList != null) { cmds.add(new BlockECRecoveryCommand(DNA_ERASURE_CODING_RECOVERY, pendingECList)); } // check block invalidation Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); if (blks != null) { cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks)); } // cache commands addCacheCommands(blockPoolId, nodeinfo, cmds); // key update command blockManager.addKeyUpdateCommand(cmds, nodeinfo); // check for balancer bandwidth update if (nodeinfo.getBalancerBandwidth() > 0) { cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth())); // set back to 0 to indicate that datanode has been sent the new value nodeinfo.setBalancerBandwidth(0); } if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } return new DatanodeCommand[0]; }
/** * Handle heartbeat from datanodes. */ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, final String blockPoolId, long capacity, long dfsUsed, long remaining, long blockPoolUsed, int xceiverCount, int maxTransfers, int failedVolumes) throws IOException { synchronized (heartbeatManager) { synchronized (datanodeMap) { DatanodeDescriptor nodeinfo = null; try { nodeinfo = getDatanode(nodeReg); } catch (UnregisteredNodeException e) { return new DatanodeCommand[]{RegisterCommand.REGISTER}; } // Check if this datanode should actually be shutdown instead. if (nodeinfo != null && nodeinfo.isDisallowed()) { setDatanodeDead(nodeinfo); throw new DisallowedDatanodeException(nodeinfo); } if (nodeinfo == null || !nodeinfo.isAlive) { return new DatanodeCommand[]{RegisterCommand.REGISTER}; } heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed, remaining, blockPoolUsed, xceiverCount, failedVolumes); //check lease recovery BlockInfoUnderConstruction[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE); if (blocks != null) { BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.length); for (BlockInfoUnderConstruction b : blocks) { brCommand.add(new RecoveringBlock(new ExtendedBlock(blockPoolId, b), getDataNodeDescriptorsTx(b), b.getBlockRecoveryId())); } return new DatanodeCommand[]{brCommand}; } final List<DatanodeCommand> cmds = new ArrayList<>(); //check pending replication List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(maxTransfers); if (pendingList != null) { cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, pendingList)); } //check block invalidation Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); if (blks != null) { cmds.add( new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks)); } blockManager.addKeyUpdateCommand(cmds, nodeinfo); // check for balancer bandwidth update if (nodeinfo.getBalancerBandwidth() > 0) { cmds.add( new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth())); // set back to 0 to indicate that datanode has been sent the new value nodeinfo.setBalancerBandwidth(0); } if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } } } return new DatanodeCommand[0]; }