HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager, final Configuration conf) { this.namesystem = namesystem; this.blockManager = blockManager; boolean avoidStaleDataNodesForWrite = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT); long recheckInterval = conf.getInt( DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min long staleInterval = conf.getLong( DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) { this.heartbeatRecheckInterval = staleInterval; LOG.info("Setting heartbeat recheck interval to " + staleInterval + " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY + " is less than " + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY); } else { this.heartbeatRecheckInterval = recheckInterval; } }
@Test public void testHeartbeatStopWatch() throws Exception { Namesystem ns = Mockito.mock(Namesystem.class); BlockManager bm = Mockito.mock(BlockManager.class); Configuration conf = new Configuration(); long recheck = 2000; conf.setLong( DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, recheck); HeartbeatManager monitor = new HeartbeatManager(ns, bm, conf); monitor.restartHeartbeatStopWatch(); assertFalse(monitor.shouldAbortHeartbeatCheck(0)); // sleep shorter than recheck and verify shouldn't abort Thread.sleep(100); assertFalse(monitor.shouldAbortHeartbeatCheck(0)); // sleep longer than recheck and verify should abort unless ignore delay Thread.sleep(recheck); assertTrue(monitor.shouldAbortHeartbeatCheck(0)); assertFalse(monitor.shouldAbortHeartbeatCheck(-recheck*3)); // ensure it resets properly monitor.restartHeartbeatStopWatch(); assertFalse(monitor.shouldAbortHeartbeatCheck(0)); }
HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager, final Configuration conf) { this.namesystem = namesystem; this.blockManager = blockManager; boolean avoidStaleDataNodesForWrite = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT); long recheckInterval = conf.getInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min long staleInterval = conf.getLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) { this.heartbeatRecheckInterval = staleInterval; LOG.info( "Setting heartbeat recheck interval to " + staleInterval + " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY + " is less than " + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY); } else { this.heartbeatRecheckInterval = recheckInterval; } }
/** * Constructor for masters. */ public NameNodeBlockTokenSecretManager(long keyUpdateInterval, long tokenLifetime, String blockPoolId, String encryptionAlgorithm, Namesystem namesystem) throws IOException { super(true, keyUpdateInterval, tokenLifetime, blockPoolId, encryptionAlgorithm); this.namesystem = namesystem; this.setSerialNo(new SecureRandom().nextInt()); if (isLeader()) { // TODO[Hooman]: Since Master is keeping the serialNo locally, so whenever // A namenode crashes it should remove all keys from the database. this.generateKeys(); } else { retrieveBlockKeys(); } }
DecommissionManager(final Namesystem namesystem, final BlockManager blockManager, final HeartbeatManager hbManager) { this.namesystem = namesystem; this.blockManager = blockManager; this.hbManager = hbManager; executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d") .setDaemon(true).build()); decomNodeBlocks = new TreeMap<>(); pendingNodes = new LinkedList<>(); }
BlockManagerSafeMode(BlockManager blockManager, Namesystem namesystem, Configuration conf) { this.blockManager = blockManager; this.namesystem = namesystem; this.haEnabled = namesystem.isHaEnabled(); this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT); if (this.threshold > 1.0) { LOG.warn("The threshold value should't be greater than 1, threshold: {}", threshold); } this.datanodeThreshold = conf.getInt( DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT); int minReplication = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT); // DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY is an expert level setting, // setting this lower than the min replication is not recommended // and/or dangerous for production setups. // When it's unset, safeReplication will use dfs.namenode.replication.min this.safeReplication = conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY, minReplication); // default to safe mode threshold (i.e., don't populate queues before // leaving safe mode) this.replQueueThreshold = conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY, (float) threshold); this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0); LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, threshold); LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, datanodeThreshold); LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, extension); }
@Test(timeout = 60000) public void testupdateNeededReplicationsDoesNotCauseSkippedReplication() throws IOException { Namesystem mockNS = mock(Namesystem.class); when(mockNS.hasReadLock()).thenReturn(true); BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong()); BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong()); // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block1, 0, 0, 1, 1); // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block2, 0, 0, 1, 1); List<List<BlockInfo>> chosenBlocks; // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); bm.setReplication((short)0, (short)1, block1); // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. // This block remains and should not be skipped over. chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); }
@Test(timeout = 60000) public void testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication() throws IOException { Namesystem mockNS = mock(Namesystem.class); when(mockNS.hasWriteLock()).thenReturn(true); BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; long blkID1 = ThreadLocalRandom.current().nextLong(); if (blkID1 < 0) { blkID1 *= -1; } long blkID2 = ThreadLocalRandom.current().nextLong(); if (blkID2 < 0) { blkID2 *= -1; } BlockInfo block1 = genBlockInfo(blkID1); BlockInfo block2 = genBlockInfo(blkID2); // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block1, 0, 0, 1, 1); // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block2, 0, 0, 1, 1); List<List<BlockInfo>> chosenBlocks; // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); final BlockInfoContiguous info = new BlockInfoContiguous(block1, (short) 1); final BlockCollection mbc = mock(BlockCollection.class); when(mbc.getId()).thenReturn(1000L); when(mbc.getLastBlock()).thenReturn(info); when(mbc.getPreferredBlockSize()).thenReturn(block1.getNumBytes() + 1); when(mbc.isUnderConstruction()).thenReturn(true); ContentSummary cs = mock(ContentSummary.class); when(cs.getLength()).thenReturn((long)1); when(mbc.computeContentSummary(bm.getStoragePolicySuite())).thenReturn(cs); info.setBlockCollectionId(1000); bm.addBlockCollection(info, mbc); DatanodeStorageInfo[] storageAry = {new DatanodeStorageInfo( dataNodes[0], new DatanodeStorage("s1"))}; info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION, storageAry); DatanodeStorageInfo storage = mock(DatanodeStorageInfo.class); DatanodeDescriptor dn = mock(DatanodeDescriptor.class); when(dn.isDecommissioned()).thenReturn(true); when(storage.getState()).thenReturn(DatanodeStorage.State.NORMAL); when(storage.getDatanodeDescriptor()).thenReturn(dn); when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true); when(storage.addBlock(any(BlockInfo.class))).thenReturn (DatanodeStorageInfo.AddBlockResult.ADDED); info.addStorage(storage, info); BlockInfo lastBlk = mbc.getLastBlock(); when(mbc.getLastBlock()).thenReturn(lastBlk, info); bm.convertLastBlockToUnderConstruction(mbc, 0L); // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. // This block remains and should not be skipped over. chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); }
DecommissionManager(final Namesystem namesystem, final BlockManager blockmanager) { this.namesystem = namesystem; this.blockmanager = blockmanager; }
public BlockManager(final Namesystem namesystem, final FSClusterStats stats, final Configuration conf) throws IOException { this.namesystem = namesystem; datanodeManager = new DatanodeManager(this, namesystem, conf); heartbeatManager = datanodeManager.getHeartbeatManager(); invalidateBlocks = new InvalidateBlocks(datanodeManager); // Compute the map capacity by allocating 2% of total memory blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR); blockplacement = BlockPlacementPolicy.getInstance( conf, stats, datanodeManager.getNetworkTopology()); pendingReplications = new PendingReplicationBlocks(conf.getInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); blockTokenSecretManager = createBlockTokenSecretManager(conf); this.maxCorruptFilesReturned = conf.getInt( DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY, DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED); this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, DFSConfigKeys.DFS_REPLICATION_DEFAULT); final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY, DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT); final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT); if (minR <= 0) throw new IOException("Unexpected configuration parameters: " + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY + " = " + minR + " <= 0"); if (maxR > Short.MAX_VALUE) throw new IOException("Unexpected configuration parameters: " + DFSConfigKeys.DFS_REPLICATION_MAX_KEY + " = " + maxR + " > " + Short.MAX_VALUE); if (minR > maxR) throw new IOException("Unexpected configuration parameters: " + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY + " = " + minR + " > " + DFSConfigKeys.DFS_REPLICATION_MAX_KEY + " = " + maxR); this.minReplication = (short)minR; this.maxReplication = (short)maxR; this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT); this.replicationStreamsHardLimit = conf.getInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT); this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false : true; this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf); this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf); this.replicationRecheckInterval = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L; this.encryptDataTransfer = conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); LOG.info("defaultReplication = " + defaultReplication); LOG.info("maxReplication = " + maxReplication); LOG.info("minReplication = " + minReplication); LOG.info("maxReplicationStreams = " + maxReplicationStreams); LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks); LOG.info("replicationRecheckInterval = " + replicationRecheckInterval); LOG.info("encryptDataTransfer = " + encryptDataTransfer); }
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem, final Configuration conf) throws IOException { this.namesystem = namesystem; this.blockManager = blockManager; this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf); networktopology = NetworkTopology.getInstance(conf); this.defaultXferPort = NetUtils.createSocketAddr( conf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort(); this.defaultInfoPort = NetUtils.createSocketAddr( conf.get(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)).getPort(); this.defaultIpcPort = NetUtils.createSocketAddr( conf.get(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort(); try { this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""), conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, "")); } catch (IOException e) { LOG.error("error reading hosts files: ", e); } this.dnsToSwitchMapping = ReflectionUtils.newInstance( conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, ScriptBasedMapping.class, DNSToSwitchMapping.class), conf); // If the dns to switch mapping supports cache, resolve network // locations of those hosts in the include list and store the mapping // in the cache; so future calls to resolve will be fast. if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) { final ArrayList<String> locations = new ArrayList<String>(); for (Entry entry : hostFileManager.getIncludes()) { if (!entry.getIpAddress().isEmpty()) { locations.add(entry.getIpAddress()); } } dnsToSwitchMapping.resolve(locations); }; final long heartbeatIntervalSeconds = conf.getLong( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); final int heartbeatRecheckInterval = conf.getInt( DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + 10 * 1000 * heartbeatIntervalSeconds; final int blockInvalidateLimit = Math.max(20*(int)(heartbeatIntervalSeconds), DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT); this.blockInvalidateLimit = conf.getInt( DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit); LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY + "=" + this.blockInvalidateLimit); this.avoidStaleDataNodesForRead = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT); this.avoidStaleDataNodesForWrite = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT); this.staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval); this.ratioUseStaleDataNodesForWrite = conf.getFloat( DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY, DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT); Preconditions.checkArgument( (ratioUseStaleDataNodesForWrite > 0 && ratioUseStaleDataNodesForWrite <= 1.0f), DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY + " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " + "It should be a positive non-zero float value, not greater than 1.0f."); }
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem, final Configuration conf) throws IOException { this.namesystem = namesystem; this.blockManager = blockManager; this.networktopology = NetworkTopology.getInstance(conf); this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf); this.hostsReader = new HostsFileReader(conf.get(DFSConfigKeys.DFS_HOSTS, ""), conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, "")); this.dnsToSwitchMapping = ReflectionUtils.newInstance( conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, ScriptBasedMapping.class, DNSToSwitchMapping.class), conf); // If the dns to switch mapping supports cache, resolve network // locations of those hosts in the include list and store the mapping // in the cache; so future calls to resolve will be fast. if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) { dnsToSwitchMapping.resolve(new ArrayList<>(hostsReader.getHosts())); } final long heartbeatIntervalSeconds = conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); final int heartbeatRecheckInterval = conf.getInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + 10 * 1000 * heartbeatIntervalSeconds; final int blockInvalidateLimit = Math.max(20 * (int) (heartbeatIntervalSeconds), DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT); this.blockInvalidateLimit = conf.getInt(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit); LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY + "=" + this.blockInvalidateLimit); this.avoidStaleDataNodesForRead = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT); this.avoidStaleDataNodesForWrite = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT); this.staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval); this.ratioUseStaleDataNodesForWrite = conf.getFloat( DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY, DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT); Preconditions.checkArgument((ratioUseStaleDataNodesForWrite > 0 && ratioUseStaleDataNodesForWrite <= 1.0f), DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY + " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " + "It should be a positive non-zero float value, not greater than 1.0f."); this.storageIdMap = new StorageIdMap(); }
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem, final Configuration conf) throws IOException { this.namesystem = namesystem; this.blockManager = blockManager; this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf); networktopology = NetworkTopology.getInstance(conf); this.defaultXferPort = NetUtils.createSocketAddr( conf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort(); this.defaultInfoPort = NetUtils.createSocketAddr( conf.get(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT)).getPort(); this.defaultInfoSecurePort = NetUtils.createSocketAddr( conf.get(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)).getPort(); this.defaultIpcPort = NetUtils.createSocketAddr( conf.get(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort(); try { this.hostFileManager.refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""), conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, "")); } catch (IOException e) { LOG.error("error reading hosts files: ", e); } this.dnsToSwitchMapping = ReflectionUtils.newInstance( conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, ScriptBasedMapping.class, DNSToSwitchMapping.class), conf); // If the dns to switch mapping supports cache, resolve network // locations of those hosts in the include list and store the mapping // in the cache; so future calls to resolve will be fast. if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) { final ArrayList<String> locations = new ArrayList<String>(); for (Entry entry : hostFileManager.getIncludes()) { if (!entry.getIpAddress().isEmpty()) { locations.add(entry.getIpAddress()); } } dnsToSwitchMapping.resolve(locations); }; final long heartbeatIntervalSeconds = conf.getLong( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); final int heartbeatRecheckInterval = conf.getInt( DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + 10 * 1000 * heartbeatIntervalSeconds; final int blockInvalidateLimit = Math.max(20*(int)(heartbeatIntervalSeconds), DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT); this.blockInvalidateLimit = conf.getInt( DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit); LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY + "=" + this.blockInvalidateLimit); this.avoidStaleDataNodesForRead = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT); this.avoidStaleDataNodesForWrite = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT); this.staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval); this.ratioUseStaleDataNodesForWrite = conf.getFloat( DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY, DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT); Preconditions.checkArgument( (ratioUseStaleDataNodesForWrite > 0 && ratioUseStaleDataNodesForWrite <= 1.0f), DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY + " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " + "It should be a positive non-zero float value, not greater than 1.0f."); }