/** * Add a the location of a cached replica of the block. * * @param loc of datanode with the cached replica */ public void addCachedLoc(DatanodeInfo loc) { List<DatanodeInfo> cachedList = Lists.newArrayList(cachedLocs); if (cachedList.contains(loc)) { return; } // Try to re-use a DatanodeInfo already in loc for (DatanodeInfoWithStorage di : locs) { if (loc.equals(di)) { cachedList.add(di); cachedLocs = cachedList.toArray(cachedLocs); return; } } // Not present in loc, add it and go cachedList.add(loc); cachedLocs = cachedList.toArray(cachedLocs); }
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs, StorageType[] storageTypes, long startOffset, boolean corrupt, DatanodeInfo[] cachedLocs) { this.b = b; this.offset = startOffset; this.corrupt = corrupt; if (locs==null) { this.locs = EMPTY_LOCS; } else { this.locs = new DatanodeInfoWithStorage[locs.length]; for(int i = 0; i < locs.length; i++) { DatanodeInfo di = locs[i]; DatanodeInfoWithStorage storage = new DatanodeInfoWithStorage(di, storageIDs != null ? storageIDs[i] : null, storageTypes != null ? storageTypes[i] : null); this.locs[i] = storage; } } this.storageIDs = storageIDs; this.storageTypes = storageTypes; if (cachedLocs == null || cachedLocs.length == 0) { this.cachedLocs = EMPTY_LOCS; } else { this.cachedLocs = cachedLocs; } }
/** * This test creates a LocatedBlock with 5 locations, sorts the locations * based on the network topology, and ensures the locations are still aligned * with the storage ids and storage types. */ @Test public void testSortLocatedBlocks() throws IOException { // create the DatanodeManager which will be tested FSNamesystem fsn = Mockito.mock(FSNamesystem.class); Mockito.when(fsn.hasWriteLock()).thenReturn(true); DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class), fsn, new Configuration()); // register 5 datanodes, each with different storage ID and type DatanodeInfo[] locs = new DatanodeInfo[5]; String[] storageIDs = new String[5]; StorageType[] storageTypes = new StorageType[]{ StorageType.ARCHIVE, StorageType.DEFAULT, StorageType.DISK, StorageType.RAM_DISK, StorageType.SSD }; for(int i = 0; i < 5; i++) { // register new datanode String uuid = "UUID-"+i; String ip = "IP-" + i; DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class); Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid); Mockito.when(dr.getIpAddr()).thenReturn(ip); Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000"); Mockito.when(dr.getXferPort()).thenReturn(9000); Mockito.when(dr.getSoftwareVersion()).thenReturn("version1"); dm.registerDatanode(dr); // get location and storage information locs[i] = dm.getDatanode(uuid); storageIDs[i] = "storageID-"+i; } // set first 2 locations as decomissioned locs[0].setDecommissioned(); locs[1].setDecommissioned(); // create LocatedBlock with above locations ExtendedBlock b = new ExtendedBlock("somePoolID", 1234); LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes); List<LocatedBlock> blocks = new ArrayList<>(); blocks.add(block); final String targetIp = locs[4].getIpAddr(); // sort block locations dm.sortLocatedBlocks(targetIp, blocks); // check that storage IDs/types are aligned with datanode locs DatanodeInfo[] sortedLocs = block.getLocations(); storageIDs = block.getStorageIDs(); storageTypes = block.getStorageTypes(); assertThat(sortedLocs.length, is(5)); assertThat(storageIDs.length, is(5)); assertThat(storageTypes.length, is(5)); for(int i = 0; i < sortedLocs.length; i++) { assertThat(((DatanodeInfoWithStorage)sortedLocs[i]).getStorageID(), is(storageIDs[i])); assertThat(((DatanodeInfoWithStorage)sortedLocs[i]).getStorageType(), is(storageTypes[i])); } // Ensure the local node is first. assertThat(sortedLocs[0].getIpAddr(), is(targetIp)); // Ensure the two decommissioned DNs were moved to the end. assertThat(sortedLocs[sortedLocs.length-1].getAdminState(), is(DatanodeInfo.AdminStates.DECOMMISSIONED)); assertThat(sortedLocs[sortedLocs.length-2].getAdminState(), is(DatanodeInfo.AdminStates.DECOMMISSIONED)); }
/** * Display info of each replica for replication block. * For striped block group, display info of each internal block. */ private String getReplicaInfo(BlockInfo storedBlock) { if (!(showLocations || showRacks || showReplicaDetails)) { return ""; } final boolean isComplete = storedBlock.isComplete(); DatanodeStorageInfo[] storages = isComplete ? blockManager.getStorages(storedBlock) : storedBlock.getUnderConstructionFeature().getExpectedStorageLocations(); StringBuilder sb = new StringBuilder(" ["); for (int i = 0; i < storages.length; i++) { DatanodeStorageInfo storage = storages[i]; DatanodeDescriptor dnDesc = storage.getDatanodeDescriptor(); if (showRacks) { sb.append(NodeBase.getPath(dnDesc)); } else { sb.append(new DatanodeInfoWithStorage(dnDesc, storage.getStorageID(), storage.getStorageType())); } if (showReplicaDetails) { LightWeightHashSet<BlockInfo> blocksExcess = blockManager.excessReplicateMap.get(dnDesc.getDatanodeUuid()); Collection<DatanodeDescriptor> corruptReplicas = blockManager.getCorruptReplicas(storedBlock); sb.append("("); if (dnDesc.isDecommissioned()) { sb.append("DECOMMISSIONED)"); } else if (dnDesc.isDecommissionInProgress()) { sb.append("DECOMMISSIONING)"); } else if (corruptReplicas != null && corruptReplicas.contains(dnDesc)) { sb.append("CORRUPT)"); } else if (blocksExcess != null && blocksExcess.contains(storedBlock)) { sb.append("EXCESS)"); } else if (dnDesc.isStale(this.staleInterval)) { sb.append("STALE_NODE)"); } else if (storage.areBlockContentsStale()) { sb.append("STALE_BLOCK_CONTENT)"); } else { sb.append("LIVE)"); } } if (i < storages.length - 1) { sb.append(", "); } } sb.append(']'); return sb.toString(); }