/** * Gets initial volume failure information for all volumes that failed * immediately at startup. The method works by determining the set difference * between all configured storage locations and the actual storage locations in * use after attempting to put all of them into service. * * @return each storage location that has failed */ private static List<VolumeFailureInfo> getInitialVolumeFailureInfos( Collection<StorageLocation> dataLocations, DataStorage storage) { Set<String> failedLocationSet = Sets.newHashSetWithExpectedSize( dataLocations.size()); for (StorageLocation sl: dataLocations) { failedLocationSet.add(sl.getFile().getAbsolutePath()); } for (Iterator<Storage.StorageDirectory> it = storage.dirIterator(); it.hasNext(); ) { Storage.StorageDirectory sd = it.next(); failedLocationSet.remove(sd.getRoot().getAbsolutePath()); } List<VolumeFailureInfo> volumeFailureInfos = Lists.newArrayListWithCapacity( failedLocationSet.size()); long failureDate = Time.now(); for (String failedStorageLocation: failedLocationSet) { volumeFailureInfos.add(new VolumeFailureInfo(failedStorageLocation, failureDate)); } return volumeFailureInfos; }
private void addVolume(Collection<StorageLocation> dataLocations, Storage.StorageDirectory sd) throws IOException { final File dir = sd.getCurrentDir(); final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot()); // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is // nothing needed to be rolled back to make various data structures, e.g., // storageMap and asyncDiskService, consistent. FsVolumeImpl fsVolume = new FsVolumeImpl( this, sd.getStorageUuid(), dir, this.conf, storageType); ReplicaMap tempVolumeMap = new ReplicaMap(this); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); volumeMap.addAll(tempVolumeMap); volumes.addVolume(fsVolume); storageMap.put(sd.getStorageUuid(), new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType)); asyncDiskService.addVolume(sd.getCurrentDir()); LOG.info("Added volume - " + dir + ", StorageType: " + storageType); }
@Test public void testAddVolumes() throws IOException { final int numNewVolumes = 3; final int numExistingVolumes = dataset.getVolumes().size(); final int totalVolumes = numNewVolumes + numExistingVolumes; List<StorageLocation> newLocations = new ArrayList<StorageLocation>(); Set<String> expectedVolumes = new HashSet<String>(); for (int i = 0; i < numNewVolumes; i++) { String path = BASE_DIR + "/newData" + i; newLocations.add(StorageLocation.parse(path)); when(storage.getStorageDir(numExistingVolumes + i)) .thenReturn(createStorageDirectory(new File(path))); } when(storage.getNumStorageDirs()).thenReturn(totalVolumes); dataset.addVolumes(newLocations, Arrays.asList(BLOCK_POOL_IDS)); assertEquals(totalVolumes, dataset.getVolumes().size()); assertEquals(totalVolumes, dataset.storageMap.size()); Set<String> actualVolumes = new HashSet<String>(); for (int i = 0; i < numNewVolumes; i++) { dataset.getVolumes().get(numExistingVolumes + i).getBasePath(); } assertEquals(actualVolumes, expectedVolumes); }
private void addVolume(Collection<StorageLocation> dataLocations, Storage.StorageDirectory sd) throws IOException { final File dir = sd.getCurrentDir(); final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot()); // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is // nothing needed to be rolled back to make various data structures, e.g., // storageMap and asyncDiskService, consistent. FsVolumeImpl fsVolume = new FsVolumeImpl( this, sd.getStorageUuid(), dir, this.conf, storageType); FsVolumeReference ref = fsVolume.obtainReference(); ReplicaMap tempVolumeMap = new ReplicaMap(this); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); synchronized (this) { volumeMap.addAll(tempVolumeMap); storageMap.put(sd.getStorageUuid(), new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType)); asyncDiskService.addVolume(sd.getCurrentDir()); volumes.addVolume(ref); } LOG.info("Added volume - " + dir + ", StorageType: " + storageType); }
private StorageType getStorageTypeFromLocations( Collection<StorageLocation> dataLocations, File dir) { for (StorageLocation dataLocation : dataLocations) { if (dataLocation.getFile().equals(dir)) { return dataLocation.getStorageType(); } } return StorageType.DEFAULT; }
@Test public void testAddVolumes() throws IOException { final int numNewVolumes = 3; final int numExistingVolumes = dataset.getVolumes().size(); final int totalVolumes = numNewVolumes + numExistingVolumes; Set<String> expectedVolumes = new HashSet<String>(); List<NamespaceInfo> nsInfos = Lists.newArrayList(); for (String bpid : BLOCK_POOL_IDS) { nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); } for (int i = 0; i < numNewVolumes; i++) { String path = BASE_DIR + "/newData" + i; String pathUri = new Path(path).toUri().toString(); expectedVolumes.add(new File(pathUri).toString()); StorageLocation loc = StorageLocation.parse(pathUri); Storage.StorageDirectory sd = createStorageDirectory(new File(path)); DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); when(storage.prepareVolume(eq(datanode), eq(loc.getFile()), anyListOf(NamespaceInfo.class))) .thenReturn(builder); dataset.addVolume(loc, nsInfos); } assertEquals(totalVolumes, dataset.getVolumes().size()); assertEquals(totalVolumes, dataset.storageMap.size()); Set<String> actualVolumes = new HashSet<String>(); for (int i = 0; i < numNewVolumes; i++) { actualVolumes.add( dataset.getVolumes().get(numExistingVolumes + i).getBasePath()); } assertEquals(actualVolumes.size(), expectedVolumes.size()); assertTrue(actualVolumes.containsAll(expectedVolumes)); }
@Test(timeout = 5000) public void testRemoveNewlyAddedVolume() throws IOException { final int numExistingVolumes = dataset.getVolumes().size(); List<NamespaceInfo> nsInfos = new ArrayList<>(); for (String bpid : BLOCK_POOL_IDS) { nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); } String newVolumePath = BASE_DIR + "/newVolumeToRemoveLater"; StorageLocation loc = StorageLocation.parse(newVolumePath); Storage.StorageDirectory sd = createStorageDirectory(new File(newVolumePath)); DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); when(storage.prepareVolume(eq(datanode), eq(loc.getFile()), anyListOf(NamespaceInfo.class))) .thenReturn(builder); dataset.addVolume(loc, nsInfos); assertEquals(numExistingVolumes + 1, dataset.getVolumes().size()); when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1); when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd); Set<File> volumesToRemove = new HashSet<>(); volumesToRemove.add(loc.getFile()); dataset.removeVolumes(volumesToRemove, true); assertEquals(numExistingVolumes, dataset.getVolumes().size()); }
@Test public void testAddVolumeFailureReleasesInUseLock() throws IOException { FsDatasetImpl spyDataset = spy(dataset); FsVolumeImpl mockVolume = mock(FsVolumeImpl.class); File badDir = new File(BASE_DIR, "bad"); badDir.mkdirs(); doReturn(mockVolume).when(spyDataset) .createFsVolume(anyString(), any(File.class), any(StorageType.class)); doThrow(new IOException("Failed to getVolumeMap()")) .when(mockVolume).getVolumeMap( anyString(), any(ReplicaMap.class), any(RamDiskReplicaLruTracker.class)); Storage.StorageDirectory sd = createStorageDirectory(badDir); sd.lock(); DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); when(storage.prepareVolume(eq(datanode), eq(badDir.getAbsoluteFile()), Matchers.<List<NamespaceInfo>>any())) .thenReturn(builder); StorageLocation location = StorageLocation.parse(badDir.toString()); List<NamespaceInfo> nsInfos = Lists.newArrayList(); for (String bpid : BLOCK_POOL_IDS) { nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); } try { spyDataset.addVolume(location, nsInfos); fail("Expect to throw MultipleIOException"); } catch (MultipleIOException e) { } FsDatasetTestUtil.assertFileLockReleased(badDir.toString()); }
@Test public void testAddVolumes() throws IOException { final int numNewVolumes = 3; final int numExistingVolumes = getNumVolumes(); final int totalVolumes = numNewVolumes + numExistingVolumes; Set<String> expectedVolumes = new HashSet<String>(); List<NamespaceInfo> nsInfos = Lists.newArrayList(); for (String bpid : BLOCK_POOL_IDS) { nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); } for (int i = 0; i < numNewVolumes; i++) { String path = BASE_DIR + "/newData" + i; String pathUri = new Path(path).toUri().toString(); expectedVolumes.add(new File(pathUri).toString()); StorageLocation loc = StorageLocation.parse(pathUri); Storage.StorageDirectory sd = createStorageDirectory(new File(path)); DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); when(storage.prepareVolume(eq(datanode), eq(loc.getFile()), anyListOf(NamespaceInfo.class))) .thenReturn(builder); dataset.addVolume(loc, nsInfos); } assertEquals(totalVolumes, getNumVolumes()); assertEquals(totalVolumes, dataset.storageMap.size()); Set<String> actualVolumes = new HashSet<String>(); try (FsDatasetSpi.FsVolumeReferences volumes = dataset.getFsVolumeReferences()) { for (int i = 0; i < numNewVolumes; i++) { actualVolumes.add(volumes.get(numExistingVolumes + i).getBasePath()); } } assertEquals(actualVolumes.size(), expectedVolumes.size()); assertTrue(actualVolumes.containsAll(expectedVolumes)); }
@Test(timeout = 5000) public void testRemoveNewlyAddedVolume() throws IOException { final int numExistingVolumes = getNumVolumes(); List<NamespaceInfo> nsInfos = new ArrayList<>(); for (String bpid : BLOCK_POOL_IDS) { nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); } String newVolumePath = BASE_DIR + "/newVolumeToRemoveLater"; StorageLocation loc = StorageLocation.parse(newVolumePath); Storage.StorageDirectory sd = createStorageDirectory(new File(newVolumePath)); DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); when(storage.prepareVolume(eq(datanode), eq(loc.getFile()), anyListOf(NamespaceInfo.class))) .thenReturn(builder); dataset.addVolume(loc, nsInfos); assertEquals(numExistingVolumes + 1, getNumVolumes()); when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1); when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd); Set<File> volumesToRemove = new HashSet<>(); volumesToRemove.add(loc.getFile()); dataset.removeVolumes(volumesToRemove, true); assertEquals(numExistingVolumes, getNumVolumes()); }
@Test public void testAddVolumes() throws IOException { final int numNewVolumes = 3; final int numExistingVolumes = dataset.getVolumes().size(); final int totalVolumes = numNewVolumes + numExistingVolumes; Set<String> expectedVolumes = new HashSet<String>(); List<NamespaceInfo> nsInfos = Lists.newArrayList(); for (String bpid : BLOCK_POOL_IDS) { nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); } for (int i = 0; i < numNewVolumes; i++) { String path = BASE_DIR + "/newData" + i; expectedVolumes.add(path); StorageLocation loc = StorageLocation.parse(path); Storage.StorageDirectory sd = createStorageDirectory(new File(path)); DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); when(storage.prepareVolume(eq(datanode), eq(loc.getFile()), anyListOf(NamespaceInfo.class))) .thenReturn(builder); dataset.addVolume(loc, nsInfos); } assertEquals(totalVolumes, dataset.getVolumes().size()); assertEquals(totalVolumes, dataset.storageMap.size()); Set<String> actualVolumes = new HashSet<String>(); for (int i = 0; i < numNewVolumes; i++) { actualVolumes.add( dataset.getVolumes().get(numExistingVolumes + i).getBasePath()); } assertEquals(actualVolumes, expectedVolumes); }
@Test public void testAddVolumeFailureReleasesInUseLock() throws IOException { FsDatasetImpl spyDataset = spy(dataset); FsVolumeImpl mockVolume = mock(FsVolumeImpl.class); File badDir = new File(BASE_DIR, "bad"); badDir.mkdirs(); doReturn(mockVolume).when(spyDataset) .createFsVolume(anyString(), any(File.class), any(StorageType.class)); doThrow(new IOException("Failed to getVolumeMap()")) .when(mockVolume).getVolumeMap( anyString(), any(ReplicaMap.class), any(RamDiskReplicaLruTracker.class)); Storage.StorageDirectory sd = createStorageDirectory(badDir); sd.lock(); DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); when(storage.prepareVolume(eq(datanode), eq(badDir), Matchers.<List<NamespaceInfo>>any())) .thenReturn(builder); StorageLocation location = StorageLocation.parse(badDir.toString()); List<NamespaceInfo> nsInfos = Lists.newArrayList(); for (String bpid : BLOCK_POOL_IDS) { nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); } try { spyDataset.addVolume(location, nsInfos); fail("Expect to throw MultipleIOException"); } catch (MultipleIOException e) { } FsDatasetTestUtil.assertFileLockReleased(badDir.toString()); }
private void addVolumeAndBlockPool(Collection<StorageLocation> dataLocations, Storage.StorageDirectory sd, final Collection<String> bpids) throws IOException { final File dir = sd.getCurrentDir(); final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot()); final FsVolumeImpl fsVolume = new FsVolumeImpl( this, sd.getStorageUuid(), dir, this.conf, storageType); final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume); List<IOException> exceptions = Lists.newArrayList(); for (final String bpid : bpids) { try { fsVolume.addBlockPool(bpid, this.conf); fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker); } catch (IOException e) { LOG.warn("Caught exception when adding " + fsVolume + ". Will throw later.", e); exceptions.add(e); } } if (!exceptions.isEmpty()) { // The states of FsDatasteImpl are not modified, thus no need to rolled back. throw MultipleIOException.createIOException(exceptions); } volumeMap.addAll(tempVolumeMap); storageMap.put(sd.getStorageUuid(), new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType)); asyncDiskService.addVolume(sd.getCurrentDir()); volumes.addVolume(fsVolume); LOG.info("Added volume - " + dir + ", StorageType: " + storageType); }
/** * An FSDataset has a directory where it loads its data files. */ FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf ) throws IOException { this.fsRunning = true; this.datanode = datanode; this.dataStorage = storage; this.conf = conf; // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. final int volFailuresTolerated = conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf); List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos( dataLocations, storage); int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length; int volsFailed = volumeFailureInfos.size(); this.validVolsRequired = volsConfigured - volFailuresTolerated; if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) { throw new DiskErrorException("Invalid volume failure " + " config value: " + volFailuresTolerated); } if (volsFailed > volFailuresTolerated) { throw new DiskErrorException("Too many failed volumes - " + "current valid volumes: " + storage.getNumStorageDirs() + ", volumes configured: " + volsConfigured + ", volumes failed: " + volsFailed + ", volume failures tolerated: " + volFailuresTolerated); } storageMap = new ConcurrentHashMap<String, DatanodeStorage>(); volumeMap = new ReplicaMap(this); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); @SuppressWarnings("unchecked") final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl = ReflectionUtils.newInstance(conf.getClass( DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(), blockChooserImpl); asyncDiskService = new FsDatasetAsyncDiskService(datanode, this); asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode); deletingBlock = new HashMap<String, Set<Long>>(); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { addVolume(dataLocations, storage.getStorageDir(idx)); } setupAsyncLazyPersistThreads(); cacheManager = new FsDatasetCache(this); // Start the lazy writer once we have built the replica maps. lazyWriter = new Daemon(new LazyWriter(conf)); lazyWriter.start(); registerMBean(datanode.getDatanodeUuid()); localFS = FileSystem.getLocal(conf); blockPinningEnabled = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT); }
@Test(timeout = 30000) public void testGetReconfigureStatus() throws IOException, InterruptedException { ReconfigurationUtil ru = mock(ReconfigurationUtil.class); datanode.setReconfigurationUtil(ru); List<ReconfigurationUtil.PropertyChange> changes = new ArrayList<ReconfigurationUtil.PropertyChange>(); File newDir = new File(cluster.getDataDirectory(), "data_new"); newDir.mkdirs(); changes.add(new ReconfigurationUtil.PropertyChange( DFS_DATANODE_DATA_DIR_KEY, newDir.toString(), datanode.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); changes.add(new ReconfigurationUtil.PropertyChange( "randomKey", "new123", "old456")); when(ru.parseChangedProperties(any(Configuration.class), any(Configuration.class))).thenReturn(changes); final int port = datanode.getIpcPort(); final String address = "localhost:" + port; assertThat(admin.startReconfiguration("datanode", address), is(0)); List<String> outputs = null; int count = 100; while (count > 0) { outputs = getReconfigureStatus("datanode", address); if (!outputs.isEmpty() && outputs.get(0).contains("finished")) { break; } count--; Thread.sleep(100); } assertTrue(count > 0); assertThat(outputs.size(), is(8)); // 3 (SUCCESS) + 4 (FAILED) List<StorageLocation> locations = DataNode.getStorageLocations( datanode.getConf()); assertThat(locations.size(), is(1)); assertThat(locations.get(0).getFile(), is(newDir)); // Verify the directory is appropriately formatted. assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory()); int successOffset = outputs.get(1).startsWith("SUCCESS:") ? 1 : 5; int failedOffset = outputs.get(1).startsWith("FAILED:") ? 1: 4; assertThat(outputs.get(successOffset), containsString("Change property " + DFS_DATANODE_DATA_DIR_KEY)); assertThat(outputs.get(successOffset + 1), is(allOf(containsString("From:"), containsString("data1"), containsString("data2")))); assertThat(outputs.get(successOffset + 2), is(not(anyOf(containsString("data1"), containsString("data2"))))); assertThat(outputs.get(successOffset + 2), is(allOf(containsString("To"), containsString("data_new")))); assertThat(outputs.get(failedOffset), containsString("Change property randomKey")); assertThat(outputs.get(failedOffset + 1), containsString("From: \"old456\"")); assertThat(outputs.get(failedOffset + 2), containsString("To: \"new123\"")); }
/** * Test reconfiguration and check the status outputs. * @param expectedSuccuss set true if the reconfiguration task should success. * @throws IOException * @throws InterruptedException */ private void testGetReconfigurationStatus(boolean expectedSuccuss) throws IOException, InterruptedException { ReconfigurationUtil ru = mock(ReconfigurationUtil.class); datanode.setReconfigurationUtil(ru); List<ReconfigurationUtil.PropertyChange> changes = new ArrayList<>(); File newDir = new File(cluster.getDataDirectory(), "data_new"); if (expectedSuccuss) { newDir.mkdirs(); } else { // Inject failure. newDir.createNewFile(); } changes.add(new ReconfigurationUtil.PropertyChange( DFS_DATANODE_DATA_DIR_KEY, newDir.toString(), datanode.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); changes.add(new ReconfigurationUtil.PropertyChange( "randomKey", "new123", "old456")); when(ru.parseChangedProperties(any(Configuration.class), any(Configuration.class))).thenReturn(changes); final int port = datanode.getIpcPort(); final String address = "localhost:" + port; assertThat(admin.startReconfiguration("datanode", address), is(0)); List<String> outputs = null; int count = 100; while (count > 0) { outputs = getReconfigureStatus("datanode", address); if (!outputs.isEmpty() && outputs.get(0).contains("finished")) { break; } count--; Thread.sleep(100); } assertTrue(count > 0); if (expectedSuccuss) { assertThat(outputs.size(), is(4)); } else { assertThat(outputs.size(), is(6)); } List<StorageLocation> locations = DataNode.getStorageLocations( datanode.getConf()); if (expectedSuccuss) { assertThat(locations.size(), is(1)); assertThat(locations.get(0).getFile(), is(newDir)); // Verify the directory is appropriately formatted. assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory()); } else { assertTrue(locations.isEmpty()); } int offset = 1; if (expectedSuccuss) { assertThat(outputs.get(offset), containsString("SUCCESS: Changed property " + DFS_DATANODE_DATA_DIR_KEY)); } else { assertThat(outputs.get(offset), containsString("FAILED: Change property " + DFS_DATANODE_DATA_DIR_KEY)); } assertThat(outputs.get(offset + 1), is(allOf(containsString("From:"), containsString("data1"), containsString("data2")))); assertThat(outputs.get(offset + 2), is(not(anyOf(containsString("data1"), containsString("data2"))))); assertThat(outputs.get(offset + 2), is(allOf(containsString("To"), containsString("data_new")))); }
/** * An FSDataset has a directory where it loads its data files. */ FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf ) throws IOException { this.fsRunning = true; this.datanode = datanode; this.dataStorage = storage; this.conf = conf; // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. volFailuresTolerated = conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf); List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos( dataLocations, storage); int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length; int volsFailed = volumeFailureInfos.size(); if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) { throw new DiskErrorException("Invalid volume failure " + " config value: " + volFailuresTolerated); } if (volsFailed > volFailuresTolerated) { throw new DiskErrorException("Too many failed volumes - " + "current valid volumes: " + storage.getNumStorageDirs() + ", volumes configured: " + volsConfigured + ", volumes failed: " + volsFailed + ", volume failures tolerated: " + volFailuresTolerated); } storageMap = new ConcurrentHashMap<String, DatanodeStorage>(); volumeMap = new ReplicaMap(this); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); @SuppressWarnings("unchecked") final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl = ReflectionUtils.newInstance(conf.getClass( DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(), blockChooserImpl); asyncDiskService = new FsDatasetAsyncDiskService(datanode); asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { addVolume(dataLocations, storage.getStorageDir(idx)); } setupAsyncLazyPersistThreads(); cacheManager = new FsDatasetCache(this); // Start the lazy writer once we have built the replica maps. lazyWriter = new Daemon(new LazyWriter(conf)); lazyWriter.start(); registerMBean(datanode.getDatanodeUuid()); }
/** Add an array of StorageLocation to FsDataset. */ public List<StorageLocation> addVolumes(List<StorageLocation> volumes, final Collection<String> bpids);
/** Removes a collection of volumes from FsDataset. */ public void removeVolumes(Collection<StorageLocation> volumes);
/** * An FSDataset has a directory where it loads its data files. */ FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf ) throws IOException { this.fsRunning = true; this.datanode = datanode; this.dataStorage = storage; this.conf = conf; // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. final int volFailuresTolerated = conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf); int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length; int volsFailed = volsConfigured - storage.getNumStorageDirs(); this.validVolsRequired = volsConfigured - volFailuresTolerated; if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) { throw new DiskErrorException("Invalid volume failure " + " config value: " + volFailuresTolerated); } if (volsFailed > volFailuresTolerated) { throw new DiskErrorException("Too many failed volumes - " + "current valid volumes: " + storage.getNumStorageDirs() + ", volumes configured: " + volsConfigured + ", volumes failed: " + volsFailed + ", volume failures tolerated: " + volFailuresTolerated); } storageMap = new ConcurrentHashMap<String, DatanodeStorage>(); volumeMap = new ReplicaMap(this); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); @SuppressWarnings("unchecked") final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl = ReflectionUtils.newInstance(conf.getClass( DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); volumes = new FsVolumeList(volsFailed, blockChooserImpl); asyncDiskService = new FsDatasetAsyncDiskService(datanode); asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { addVolume(dataLocations, storage.getStorageDir(idx)); } setupAsyncLazyPersistThreads(); cacheManager = new FsDatasetCache(this); // Start the lazy writer once we have built the replica maps. lazyWriter = new Daemon(new LazyWriter(conf)); lazyWriter.start(); registerMBean(datanode.getDatanodeUuid()); }
/** * Removes a collection of volumes from FsDataset. * @param volumes the root directories of the volumes. * * DataNode should call this function before calling * {@link DataStorage#removeVolumes(java.util.Collection)}. */ @Override public synchronized void removeVolumes(Collection<StorageLocation> volumes) { Set<File> volumeSet = new HashSet<File>(); for (StorageLocation sl : volumes) { volumeSet.add(sl.getFile()); } for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); if (volumeSet.contains(sd.getRoot())) { String volume = sd.getRoot().toString(); LOG.info("Removing " + volume + " from FsDataset."); // Disable the volume from the service. asyncDiskService.removeVolume(sd.getCurrentDir()); this.volumes.removeVolume(volume); // Removed all replica information for the blocks on the volume. Unlike // updating the volumeMap in addVolume(), this operation does not scan // disks. for (String bpid : volumeMap.getBlockPoolList()) { List<Block> blocks = new ArrayList<Block>(); for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator(); it.hasNext(); ) { ReplicaInfo block = it.next(); if (block.getVolume().getBasePath().equals(volume)) { invalidate(bpid, block); blocks.add(block); it.remove(); } } // Delete blocks from the block scanner in batch. datanode.getBlockScanner().deleteBlocks(bpid, blocks.toArray(new Block[blocks.size()])); } storageMap.remove(sd.getStorageUuid()); } } setupAsyncLazyPersistThreads(); }