@Override public void initialize(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap, Host2NodesMap host2datanodeMap) { this.considerLoad = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true); this.stats = stats; this.clusterMap = clusterMap; this.host2datanodeMap = host2datanodeMap; this.heartbeatInterval = conf.getLong( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000; this.tolerateHeartbeatMultiplier = conf.getInt( DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT); this.staleInterval = conf.getLong( DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT); }
@Override public void initialize(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap) { this.considerLoad = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true); this.stats = stats; this.clusterMap = clusterMap; this.heartbeatInterval = conf.getLong( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000; this.tolerateHeartbeatMultiplier = conf.getInt( DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT); this.staleInterval = conf.getLong( DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT); }
@Override public void initialize(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap) { this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true); this.stats = stats; this.clusterMap = clusterMap; this.heartbeatInterval = conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000; this.tolerateHeartbeatMultiplier = conf.getInt( DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT); this.staleInterval = conf.getLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT); }
@Override public void initialize(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap, Host2NodesMap host2datanodeMap) { LOG.info("initialize"); super.initialize(conf, stats, clusterMap, host2datanodeMap); _random = new Random(); Class<?> c = conf.getClass(BLUR_BLOCK_PLACEMENT_SERVER_LOOKUP, DefaultServerLookup.class); if (host2datanodeMap == null) { _serverLookup = DEFAULT; } else { try { Constructor<?> constructor = c.getConstructor(new Class[] { Configuration.class, Host2NodesMap.class }); _serverLookup = (ServerLookup) constructor.newInstance(conf, host2datanodeMap); } catch (Exception e) { throw new RuntimeException(e); } } }
/** * Get an instance of the configured Block Placement Policy based on the * the configuration property * {@link DFSConfigKeys#DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}. * * @param conf the configuration to be used * @param stats an object that is used to retrieve the load on the cluster * @param clusterMap the network topology of the cluster * @return an instance of BlockPlacementPolicy */ public static BlockPlacementPolicy getInstance(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap, Host2NodesMap host2datanodeMap) { final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass( DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT, BlockPlacementPolicy.class); final BlockPlacementPolicy replicator = ReflectionUtils.newInstance( replicatorClass, conf); replicator.initialize(conf, stats, clusterMap, host2datanodeMap); return replicator; }
/** * Get an instance of the configured Block Placement Policy based on the * value of the configuration paramater dfs.block.replicator.classname. * * @param conf the configuration to be used * @param stats an object that is used to retrieve the load on the cluster * @param clusterMap the network topology of the cluster * @return an instance of BlockPlacementPolicy */ public static BlockPlacementPolicy getInstance(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap) { Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass("dfs.block.replicator.classname", BlockPlacementPolicyDefault.class, BlockPlacementPolicy.class); BlockPlacementPolicy replicator = (BlockPlacementPolicy) ReflectionUtils.newInstance( replicatorClass, conf); replicator.initialize(conf, stats, clusterMap); return replicator; }
/** * Get an instance of the configured Block Placement Policy based on the * the configuration property {@link DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}. * * @param conf the configuration to be used * @param stats an object that is used to retrieve the load on the cluster * @param clusterMap the network topology of the cluster * @return an instance of BlockPlacementPolicy */ public static BlockPlacementPolicy getInstance(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap) { final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass( DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT, BlockPlacementPolicy.class); final BlockPlacementPolicy replicator = ReflectionUtils.newInstance( replicatorClass, conf); replicator.initialize(conf, stats, clusterMap); return replicator; }
protected BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap, DatanodeManager datanodeManager) { initialize(conf, stats, clusterMap, host2datanodeMap); }
public void initialize(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap, Host2NodesMap host2datanodeMap) { super.initialize(conf, stats, clusterMap, host2datanodeMap); }
public BlockManager(final Namesystem namesystem, final FSClusterStats stats, final Configuration conf) throws IOException { this.namesystem = namesystem; datanodeManager = new DatanodeManager(this, namesystem, conf); heartbeatManager = datanodeManager.getHeartbeatManager(); invalidateBlocks = new InvalidateBlocks(datanodeManager); // Compute the map capacity by allocating 2% of total memory blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR); blockplacement = BlockPlacementPolicy.getInstance( conf, stats, datanodeManager.getNetworkTopology()); pendingReplications = new PendingReplicationBlocks(conf.getInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); blockTokenSecretManager = createBlockTokenSecretManager(conf); this.maxCorruptFilesReturned = conf.getInt( DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY, DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED); this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, DFSConfigKeys.DFS_REPLICATION_DEFAULT); final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY, DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT); final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT); if (minR <= 0) throw new IOException("Unexpected configuration parameters: " + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY + " = " + minR + " <= 0"); if (maxR > Short.MAX_VALUE) throw new IOException("Unexpected configuration parameters: " + DFSConfigKeys.DFS_REPLICATION_MAX_KEY + " = " + maxR + " > " + Short.MAX_VALUE); if (minR > maxR) throw new IOException("Unexpected configuration parameters: " + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY + " = " + minR + " > " + DFSConfigKeys.DFS_REPLICATION_MAX_KEY + " = " + maxR); this.minReplication = (short)minR; this.maxReplication = (short)maxR; this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT); this.replicationStreamsHardLimit = conf.getInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT); this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false : true; this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf); this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf); this.replicationRecheckInterval = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L; this.encryptDataTransfer = conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); LOG.info("defaultReplication = " + defaultReplication); LOG.info("maxReplication = " + maxReplication); LOG.info("minReplication = " + minReplication); LOG.info("maxReplicationStreams = " + maxReplicationStreams); LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks); LOG.info("replicationRecheckInterval = " + replicationRecheckInterval); LOG.info("encryptDataTransfer = " + encryptDataTransfer); }
BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap) { initialize(conf, stats, clusterMap); }
BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap) { initialize(conf, stats, clusterMap); }
public void initialize(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap) { super.initialize(conf, stats, clusterMap); }
protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap) { initialize(conf, stats, clusterMap); }
protected BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap) { initialize(conf, stats, clusterMap); }
/** * Get an instance of the configured Block Placement Policy based on the * value of the configuration paramater dfs.block.replicator.classname. * * @param conf * the configuration to be used * @param stats * an object that is used to retrieve the load on the cluster * @param clusterMap * the network topology of the cluster * @return an instance of BlockPlacementPolicy */ public static BlockPlacementPolicy getInstance(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap) { Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass("dfs.block.replicator.classname", BlockPlacementPolicyDefault.class, BlockPlacementPolicy.class); BlockPlacementPolicy replicator = (BlockPlacementPolicy) ReflectionUtils .newInstance(replicatorClass, conf); replicator.initialize(conf, stats, clusterMap); return replicator; }
/** * Used to setup a BlockPlacementPolicy object. This should be defined by * all implementations of a BlockPlacementPolicy. * * @param conf the configuration object * @param stats retrieve cluster status from here * @param clusterMap cluster topology */ abstract protected void initialize(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap, Host2NodesMap host2datanodeMap);
/** * Used to setup a BlockPlacementPolicy object. This should be defined by * all implementations of a BlockPlacementPolicy. * * @param conf the configuration object * @param stats retrieve cluster status from here * @param clusterMap cluster topology */ abstract protected void initialize(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap);
/** * Used to setup a BlockPlacementPolicy object. This should be defined by * all implementations of a BlockPlacementPolicy. * * @param conf * the configuration object * @param stats * retrieve cluster status from here * @param clusterMap * cluster topology */ abstract protected void initialize(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap);