/** * An FSDataset has a directory where it loads its data files. */ FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf ) throws IOException { this.fsRunning = true; this.datanode = datanode; this.dataStorage = storage; this.conf = conf; // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. final int volFailuresTolerated = conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf); List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos( dataLocations, storage); int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length; int volsFailed = volumeFailureInfos.size(); this.validVolsRequired = volsConfigured - volFailuresTolerated; if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) { throw new DiskErrorException("Invalid volume failure " + " config value: " + volFailuresTolerated); } if (volsFailed > volFailuresTolerated) { throw new DiskErrorException("Too many failed volumes - " + "current valid volumes: " + storage.getNumStorageDirs() + ", volumes configured: " + volsConfigured + ", volumes failed: " + volsFailed + ", volume failures tolerated: " + volFailuresTolerated); } storageMap = new ConcurrentHashMap<String, DatanodeStorage>(); volumeMap = new ReplicaMap(this); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); @SuppressWarnings("unchecked") final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl = ReflectionUtils.newInstance(conf.getClass( DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(), blockChooserImpl); asyncDiskService = new FsDatasetAsyncDiskService(datanode, this); asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode); deletingBlock = new HashMap<String, Set<Long>>(); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { addVolume(dataLocations, storage.getStorageDir(idx)); } setupAsyncLazyPersistThreads(); cacheManager = new FsDatasetCache(this); // Start the lazy writer once we have built the replica maps. lazyWriter = new Daemon(new LazyWriter(conf)); lazyWriter.start(); registerMBean(datanode.getDatanodeUuid()); localFS = FileSystem.getLocal(conf); blockPinningEnabled = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT); }
/** * An FSDataset has a directory where it loads its data files. */ FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf ) throws IOException { this.fsRunning = true; this.datanode = datanode; this.dataStorage = storage; this.conf = conf; // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. volFailuresTolerated = conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf); List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos( dataLocations, storage); int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length; int volsFailed = volumeFailureInfos.size(); if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) { throw new DiskErrorException("Invalid volume failure " + " config value: " + volFailuresTolerated); } if (volsFailed > volFailuresTolerated) { throw new DiskErrorException("Too many failed volumes - " + "current valid volumes: " + storage.getNumStorageDirs() + ", volumes configured: " + volsConfigured + ", volumes failed: " + volsFailed + ", volume failures tolerated: " + volFailuresTolerated); } storageMap = new ConcurrentHashMap<String, DatanodeStorage>(); volumeMap = new ReplicaMap(this); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); @SuppressWarnings("unchecked") final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl = ReflectionUtils.newInstance(conf.getClass( DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(), blockChooserImpl); asyncDiskService = new FsDatasetAsyncDiskService(datanode); asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { addVolume(dataLocations, storage.getStorageDir(idx)); } setupAsyncLazyPersistThreads(); cacheManager = new FsDatasetCache(this); // Start the lazy writer once we have built the replica maps. lazyWriter = new Daemon(new LazyWriter(conf)); lazyWriter.start(); registerMBean(datanode.getDatanodeUuid()); }
/** * An FSDataset has a directory where it loads its data files. */ FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf ) throws IOException { this.datanode = datanode; // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. final int volFailuresTolerated = conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length; int volsFailed = volsConfigured - storage.getNumStorageDirs(); this.validVolsRequired = volsConfigured - volFailuresTolerated; if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) { throw new DiskErrorException("Invalid volume failure " + " config value: " + volFailuresTolerated); } if (volsFailed > volFailuresTolerated) { throw new DiskErrorException("Too many failed volumes - " + "current valid volumes: " + storage.getNumStorageDirs() + ", volumes configured: " + volsConfigured + ", volumes failed: " + volsFailed + ", volume failures tolerated: " + volFailuresTolerated); } final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>( storage.getNumStorageDirs()); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { final File dir = storage.getStorageDir(idx).getCurrentDir(); volArray.add(new FsVolumeImpl(this, storage.getStorageID(), dir, conf)); LOG.info("Added volume - " + dir); } volumeMap = new ReplicaMap(this); @SuppressWarnings("unchecked") final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl = ReflectionUtils.newInstance(conf.getClass( DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl); volumes.getVolumeMap(volumeMap); File[] roots = new File[storage.getNumStorageDirs()]; for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { roots[idx] = storage.getStorageDir(idx).getCurrentDir(); } asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots); registerMBean(storage.getStorageID()); }
/** * An FSDataset has a directory where it loads its data files. */ FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf ) throws IOException { this.fsRunning = true; this.datanode = datanode; this.dataStorage = storage; this.conf = conf; // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. final int volFailuresTolerated = conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf); int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length; int volsFailed = volsConfigured - storage.getNumStorageDirs(); this.validVolsRequired = volsConfigured - volFailuresTolerated; if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) { throw new DiskErrorException("Invalid volume failure " + " config value: " + volFailuresTolerated); } if (volsFailed > volFailuresTolerated) { throw new DiskErrorException("Too many failed volumes - " + "current valid volumes: " + storage.getNumStorageDirs() + ", volumes configured: " + volsConfigured + ", volumes failed: " + volsFailed + ", volume failures tolerated: " + volFailuresTolerated); } storageMap = new ConcurrentHashMap<String, DatanodeStorage>(); volumeMap = new ReplicaMap(this); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); @SuppressWarnings("unchecked") final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl = ReflectionUtils.newInstance(conf.getClass( DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); volumes = new FsVolumeList(volsFailed, blockChooserImpl); asyncDiskService = new FsDatasetAsyncDiskService(datanode); asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { addVolume(dataLocations, storage.getStorageDir(idx)); } setupAsyncLazyPersistThreads(); cacheManager = new FsDatasetCache(this); // Start the lazy writer once we have built the replica maps. lazyWriter = new Daemon(new LazyWriter(conf)); lazyWriter.start(); registerMBean(datanode.getDatanodeUuid()); }
/** * An FSDataset has a directory where it loads its data files. */ FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf) throws IOException { this.datanode = datanode; // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. final int volFailuresTolerated = conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length; int volsFailed = volsConfigured - storage.getNumStorageDirs(); this.validVolsRequired = volsConfigured - volFailuresTolerated; if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) { throw new DiskErrorException( "Invalid volume failure " + " config value: " + volFailuresTolerated); } if (volsFailed > volFailuresTolerated) { throw new DiskErrorException( "Too many failed volumes - " + "current valid volumes: " + storage.getNumStorageDirs() + ", volumes configured: " + volsConfigured + ", volumes failed: " + volsFailed + ", volume failures tolerated: " + volFailuresTolerated); } final List<FsVolumeImpl> volArray = new ArrayList<>(storage.getNumStorageDirs()); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { final File dir = storage.getStorageDir(idx).getCurrentDir(); volArray.add(new FsVolumeImpl(this, storage.getStorageID(), dir, conf)); LOG.info("Added volume - " + dir); } volumeMap = new ReplicaMap(this); @SuppressWarnings("unchecked") final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl = ReflectionUtils .newInstance(conf.getClass( DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl); volumes.getVolumeMap(volumeMap); File[] roots = new File[storage.getNumStorageDirs()]; for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { roots[idx] = storage.getStorageDir(idx).getCurrentDir(); } asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots); registerMBean(storage.getStorageID()); NUM_BUCKETS = conf.getInt(DFSConfigKeys.DFS_NUM_BUCKETS_KEY, DFSConfigKeys.DFS_NUM_BUCKETS_DEFAULT); }