@Override public void transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes) throws IOException { OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildClientHeader( blk, clientName, blockToken)) .addAllTargets(PBHelper.convert(targets)) .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes)) .build(); send(out, Op.TRANSFER_BLOCK, proto); }
/** * Get all valid locations of the block & add the block to results * return the length of the added block; 0 if the block is not added */ private long addBlock(Block block, List<BlockWithLocations> results) { final List<DatanodeStorageInfo> locations = getValidLocations(block); if(locations.size() == 0) { return 0; } else { final String[] datanodeUuids = new String[locations.size()]; final String[] storageIDs = new String[datanodeUuids.length]; final StorageType[] storageTypes = new StorageType[datanodeUuids.length]; for(int i = 0; i < locations.size(); i++) { final DatanodeStorageInfo s = locations.get(i); datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid(); storageIDs[i] = s.getStorageID(); storageTypes[i] = s.getStorageType(); } results.add(new BlockWithLocations(block, datanodeUuids, storageIDs, storageTypes)); return block.getNumBytes(); } }
/** * @return a list of {@link StorageType}s for storing the replicas of a block. */ public List<StorageType> chooseStorageTypes(final short replication) { final List<StorageType> types = new LinkedList<StorageType>(); int i = 0, j = 0; // Do not return transient storage types. We will not have accurate // usage information for transient types. for (;i < replication && j < storageTypes.length; ++j) { if (!storageTypes[j].isTransient()) { types.add(storageTypes[j]); ++i; } } final StorageType last = storageTypes[storageTypes.length - 1]; if (!last.isTransient()) { for (; i < replication; i++) { types.add(last); } } return types; }
private Replication getOrVerifyReplication(Path file, Replication expected) throws IOException { final List<LocatedBlock> lbs = dfs.getClient().getLocatedBlocks( file.toString(), 0).getLocatedBlocks(); Assert.assertEquals(1, lbs.size()); LocatedBlock lb = lbs.get(0); StringBuilder types = new StringBuilder(); final Replication r = new Replication(); for(StorageType t : lb.getStorageTypes()) { types.append(t).append(", "); if (t == StorageType.DISK) { r.disk++; } else if (t == StorageType.ARCHIVE) { r.archive++; } else { Assert.fail("Unexpected storage type " + t); } } if (expected != null) { final String s = "file = " + file + "\n types = [" + types + "]"; Assert.assertEquals(s, expected, r); } return r; }
private static StorageType[][] genStorageTypes(int numDataNodes, int numAllDisk, int numAllArchive, int numRamDisk) { Preconditions.checkArgument( (numAllDisk + numAllArchive + numRamDisk) <= numDataNodes); StorageType[][] types = new StorageType[numDataNodes][]; int i = 0; for (; i < numRamDisk; i++) { types[i] = new StorageType[]{StorageType.RAM_DISK, StorageType.DISK}; } for (; i < numRamDisk + numAllDisk; i++) { types[i] = new StorageType[]{StorageType.DISK, StorageType.DISK}; } for (; i < numRamDisk + numAllDisk + numAllArchive; i++) { types[i] = new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE}; } for (; i < types.length; i++) { types[i] = new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}; } return types; }
@Override // FsDatasetSpi public synchronized ReplicaHandler createTemporary( StorageType storageType, ExtendedBlock b) throws IOException { if (isValidBlock(b)) { throw new ReplicaAlreadyExistsException("Block " + b + " is valid, and cannot be written to."); } if (isValidRbw(b)) { throw new ReplicaAlreadyExistsException("Block " + b + " is being written, and cannot be written to."); } final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true); map.put(binfo.theBlock, binfo); return new ReplicaHandler(binfo, null); }
private DatanodeStorageInfo chooseFromNextRack(Node next, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, EnumMap<StorageType, Integer> storageTypes) throws NotEnoughReplicasException { final String nextRack = next.getNetworkLocation(); try { return chooseRandom(nextRack, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); } catch(NotEnoughReplicasException e) { if (LOG.isDebugEnabled()) { LOG.debug("Failed to choose from the next rack (location = " + nextRack + "), retry choosing ramdomly", e); } //otherwise randomly choose one from the network return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); } }
/** * @return true if the given block is good for the tentative move. */ private boolean markMovedIfGoodBlock(DBlock block, StorageType targetStorageType) { synchronized (block) { synchronized (movedBlocks) { if (isGoodBlockCandidate(source, target, targetStorageType, block)) { this.block = block; if (chooseProxySource()) { movedBlocks.put(block); if (LOG.isDebugEnabled()) { LOG.debug("Decided to move " + this); } return true; } } } } return false; }
/** * Attempt to parse a storage uri with storage class and URI. The storage * class component of the uri is case-insensitive. * * @param rawLocation Location string of the format [type]uri, where [type] is * optional. * @return A StorageLocation object if successfully parsed, null otherwise. * Does not throw any exceptions. */ public static StorageLocation parse(String rawLocation) throws IOException, SecurityException { Matcher matcher = regex.matcher(rawLocation); StorageType storageType = StorageType.DEFAULT; String location = rawLocation; if (matcher.matches()) { String classString = matcher.group(1); location = matcher.group(2); if (!classString.isEmpty()) { storageType = StorageType.valueOf(classString.toUpperCase()); } } return new StorageLocation(storageType, Util.stringAsURI(location)); }
private void verifyFile(final Path parent, final HdfsFileStatus status, final Byte expectedPolicyId) throws Exception { HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status; byte policyId = fileStatus.getStoragePolicy(); BlockStoragePolicy policy = policies.getPolicy(policyId); if (expectedPolicyId != null) { Assert.assertEquals((byte)expectedPolicyId, policy.getId()); } final List<StorageType> types = policy.chooseStorageTypes( status.getReplication()); for(LocatedBlock lb : fileStatus.getBlockLocations().getLocatedBlocks()) { final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types, lb.getStorageTypes()); Assert.assertTrue(fileStatus.getFullName(parent.toString()) + " with policy " + policy + " has non-empty overlap: " + diff + ", the corresponding block is " + lb.getBlock().getLocalBlock(), diff.removeOverlap(true)); } }
/** * BlockRecoveryFI_10. DN has no ReplicaUnderRecovery. * * @throws IOException in case of an error */ @Test public void testNoReplicaUnderRecovery() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } dn.data.createRbw(StorageType.DEFAULT, block, false); try { dn.syncBlock(rBlock, initBlockRecords(dn)); fail("Sync should fail"); } catch (IOException e) { e.getMessage().startsWith("Cannot recover "); } DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID); verify(namenode, never()).commitBlockSynchronization( any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(), anyBoolean(), any(DatanodeID[].class), any(String[].class)); }
public static LocatedBlockProto convert(LocatedBlock b) { if (b == null) return null; Builder builder = LocatedBlockProto.newBuilder(); DatanodeInfo[] locs = b.getLocations(); List<DatanodeInfo> cachedLocs = Lists.newLinkedList(Arrays.asList(b.getCachedLocations())); for (int i = 0; i < locs.length; i++) { DatanodeInfo loc = locs[i]; builder.addLocs(i, PBHelper.convert(loc)); boolean locIsCached = cachedLocs.contains(loc); builder.addIsCached(locIsCached); if (locIsCached) { cachedLocs.remove(loc); } } Preconditions.checkArgument(cachedLocs.size() == 0, "Found additional cached replica locations that are not in the set of" + " storage-backed locations!"); StorageType[] storageTypes = b.getStorageTypes(); if (storageTypes != null) { for (int i = 0; i < storageTypes.length; ++i) { builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i])); } } final String[] storageIDs = b.getStorageIDs(); if (storageIDs != null) { builder.addAllStorageIDs(Arrays.asList(storageIDs)); } return builder.setB(PBHelper.convert(b.getBlock())) .setBlockToken(PBHelper.convert(b.getBlockToken())) .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build(); }
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs, StorageType[] storageTypes, long startOffset, boolean corrupt, DatanodeInfo[] cachedLocs) { this.b = b; this.offset = startOffset; this.corrupt = corrupt; if (locs==null) { this.locs = EMPTY_LOCS; } else { this.locs = new DatanodeInfoWithStorage[locs.length]; for(int i = 0; i < locs.length; i++) { DatanodeInfo di = locs[i]; DatanodeInfoWithStorage storage = new DatanodeInfoWithStorage(di, storageIDs != null ? storageIDs[i] : null, storageTypes != null ? storageTypes[i] : null); this.locs[i] = storage; } } this.storageIDs = storageIDs; this.storageTypes = storageTypes; if (cachedLocs == null || cachedLocs.length == 0) { this.cachedLocs = EMPTY_LOCS; } else { this.cachedLocs = cachedLocs; } }
@VisibleForTesting public BlockStoragePolicy(byte id, String name, StorageType[] storageTypes, StorageType[] creationFallbacks, StorageType[] replicationFallbacks, boolean copyOnCreateFile) { this.id = id; this.name = name; this.storageTypes = storageTypes; this.creationFallbacks = creationFallbacks; this.replicationFallbacks = replicationFallbacks; this.copyOnCreateFile = copyOnCreateFile; }
@Test public void TestConvertDatanodeStorage() { DatanodeStorage dns1 = new DatanodeStorage( "id1", DatanodeStorage.State.NORMAL, StorageType.SSD); DatanodeStorageProto proto = PBHelper.convert(dns1); DatanodeStorage dns2 = PBHelper.convert(proto); compare(dns1, dns2); }
/** * Choose the storage types for storing the remaining replicas, given the * replication number, the storage types of the chosen replicas and * the unavailable storage types. It uses fallback storage in case that * the desired storage type is unavailable. * * @param replication the replication number. * @param chosen the storage types of the chosen replicas. * @param unavailables the unavailable storage types. * @param isNewBlock Is it for new block creation? * @return a list of {@link StorageType}s for storing the replicas of a block. */ public List<StorageType> chooseStorageTypes(final short replication, final Iterable<StorageType> chosen, final EnumSet<StorageType> unavailables, final boolean isNewBlock) { final List<StorageType> excess = new LinkedList<StorageType>(); final List<StorageType> storageTypes = chooseStorageTypes( replication, chosen, excess); final int expectedSize = storageTypes.size() - excess.size(); final List<StorageType> removed = new LinkedList<StorageType>(); for(int i = storageTypes.size() - 1; i >= 0; i--) { // replace/remove unavailable storage types. final StorageType t = storageTypes.get(i); if (unavailables.contains(t)) { final StorageType fallback = isNewBlock? getCreationFallback(unavailables) : getReplicationFallback(unavailables); if (fallback == null) { removed.add(storageTypes.remove(i)); } else { storageTypes.set(i, fallback); } } } // remove excess storage types after fallback replacement. diff(storageTypes, excess, null); if (storageTypes.size() < expectedSize) { LOG.warn("Failed to place enough replicas: expected size is " + expectedSize + " but only " + storageTypes.size() + " storage types can be selected " + "(replication=" + replication + ", selected=" + storageTypes + ", unavailable=" + unavailables + ", removed=" + removed + ", policy=" + this + ")"); } return storageTypes; }
public static BlockCommandProto convert(BlockCommand cmd) { BlockCommandProto.Builder builder = BlockCommandProto.newBuilder() .setBlockPoolId(cmd.getBlockPoolId()); switch (cmd.getAction()) { case DatanodeProtocol.DNA_TRANSFER: builder.setAction(BlockCommandProto.Action.TRANSFER); break; case DatanodeProtocol.DNA_INVALIDATE: builder.setAction(BlockCommandProto.Action.INVALIDATE); break; case DatanodeProtocol.DNA_SHUTDOWN: builder.setAction(BlockCommandProto.Action.SHUTDOWN); break; default: throw new AssertionError("Invalid action"); } Block[] blocks = cmd.getBlocks(); for (int i = 0; i < blocks.length; i++) { builder.addBlocks(PBHelper.convert(blocks[i])); } builder.addAllTargets(convert(cmd.getTargets())) .addAllTargetStorageUuids(convert(cmd.getTargetStorageIDs())); StorageType[][] types = cmd.getTargetStorageTypes(); if (types != null) { builder.addAllTargetStorageTypes(convert(types)); } return builder.build(); }
void reportBadBlocks(ExtendedBlock block, String storageUuid, StorageType storageType) { checkBlock(block); for (BPServiceActor actor : bpServices) { actor.reportBadBlocks(block, storageUuid, storageType); } }
public static StorageType convertStorageType(StorageTypeProto type) { switch(type) { case DISK: return StorageType.DISK; case SSD: return StorageType.SSD; case ARCHIVE: return StorageType.ARCHIVE; case RAM_DISK: return StorageType.RAM_DISK; default: throw new IllegalStateException( "BUG: StorageTypeProto not found, type=" + type); } }
void initAvgUtilization() { for(StorageType t : StorageType.asList()) { final long capacity = totalCapacities.get(t); if (capacity > 0L) { final double avg = totalUsedSpaces.get(t)*100.0/capacity; avgUtilizations.set(t, avg); } } }
public static StorageType[] convertStorageTypes( List<StorageTypeProto> storageTypesList, int expectedSize) { final StorageType[] storageTypes = new StorageType[expectedSize]; if (storageTypesList.size() != expectedSize) { // missing storage types Preconditions.checkState(storageTypesList.isEmpty()); Arrays.fill(storageTypes, StorageType.DEFAULT); } else { for (int i = 0; i < storageTypes.length; ++i) { storageTypes[i] = convertStorageType(storageTypesList.get(i)); } } return storageTypes; }
/** * Test writes a file and closes it. * Block reported is generated with an extra block. * Block report is forced and the check for # of pendingdeletion * blocks is performed. * * @throws IOException in case of an error */ @Test(timeout=300000) public void blockReport_04() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); DFSTestUtil.createFile(fs, filePath, FILE_SIZE, REPL_FACTOR, rand.nextLong()); DataNode dn = cluster.getDataNodes().get(DN_N0); // all blocks belong to the same file, hence same BP String poolId = cluster.getNamesystem().getBlockPoolId(); // Create a bogus new block which will not be present on the namenode. ExtendedBlock b = new ExtendedBlock( poolId, rand.nextLong(), 1024L, rand.nextLong()); dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); sendBlockReports(dnR, poolId, reports); printStats(); assertThat("Wrong number of corrupt blocks", cluster.getNamesystem().getCorruptReplicaBlocks(), is(0L)); assertThat("Wrong number of PendingDeletion blocks", cluster.getNamesystem().getPendingDeletionBlocks(), is(1L)); }
/** Decrement the number of blocks scheduled. */ void decrementBlocksScheduled(StorageType t) { if (prevApproxBlocksScheduled.get(t) > 0) { prevApproxBlocksScheduled.subtract(t, 1); } else if (currApproxBlocksScheduled.get(t) > 0) { currApproxBlocksScheduled.subtract(t, 1); } // its ok if both counters are zero. }
public static StorageTypeProto convertStorageType(StorageType type) { switch(type) { case DISK: return StorageTypeProto.DISK; case SSD: return StorageTypeProto.SSD; case ARCHIVE: return StorageTypeProto.ARCHIVE; case RAM_DISK: return StorageTypeProto.RAM_DISK; default: throw new IllegalStateException( "BUG: StorageType not found, type=" + type); } }
public static StorageTypesProto convert(StorageType[] types) { if (types == null || types.length == 0) { return null; } List<StorageTypeProto> list = convertStorageTypes(types); return StorageTypesProto.newBuilder().addAllStorageTypes(list).build(); }
/** Decide if the given block is a good candidate to move or not */ private boolean isGoodBlockCandidate(DBlock block) { // source and target must have the same storage type final StorageType sourceStorageType = getStorageType(); for (Task t : tasks) { if (Dispatcher.this.isGoodBlockCandidate(this, t.target, sourceStorageType, block)) { return true; } } return false; }
@Test public void testConvertBlockCommand() { Block[] blocks = new Block[] { new Block(21), new Block(22) }; DatanodeInfo[][] dnInfos = new DatanodeInfo[][] { new DatanodeInfo[1], new DatanodeInfo[2] }; dnInfos[0][0] = DFSTestUtil.getLocalDatanodeInfo(); dnInfos[1][0] = DFSTestUtil.getLocalDatanodeInfo(); dnInfos[1][1] = DFSTestUtil.getLocalDatanodeInfo(); String[][] storageIDs = {{"s00"}, {"s10", "s11"}}; StorageType[][] storageTypes = {{StorageType.DEFAULT}, {StorageType.DEFAULT, StorageType.DEFAULT}}; BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1", blocks, dnInfos, storageTypes, storageIDs); BlockCommandProto bcProto = PBHelper.convert(bc); BlockCommand bc2 = PBHelper.convert(bcProto); assertEquals(bc.getAction(), bc2.getAction()); assertEquals(bc.getBlocks().length, bc2.getBlocks().length); Block[] blocks2 = bc2.getBlocks(); for (int i = 0; i < blocks.length; i++) { assertEquals(blocks[i], blocks2[i]); } DatanodeInfo[][] dnInfos2 = bc2.getTargets(); assertEquals(dnInfos.length, dnInfos2.length); for (int i = 0; i < dnInfos.length; i++) { DatanodeInfo[] d1 = dnInfos[i]; DatanodeInfo[] d2 = dnInfos2[i]; assertEquals(d1.length, d2.length); for (int j = 0; j < d1.length; j++) { compare(d1[j], d2[j]); } } }
@Override Double getUtilization(DatanodeStorageReport r, final StorageType t) { long capacity = 0L; long dfsUsed = 0L; for(StorageReport s : r.getStorageReports()) { if (s.getStorage().getStorageType() == t) { capacity += s.getCapacity(); dfsUsed += s.getDfsUsed(); } } return capacity == 0L? null: dfsUsed*100.0/capacity; }
@Override void accumulateSpaces(DatanodeStorageReport r) { for(StorageReport s : r.getStorageReports()) { final StorageType t = s.getStorage().getStorageType(); totalCapacities.add(t, s.getCapacity()); totalUsedSpaces.add(t, s.getBlockPoolUsed()); } }
@Override Double getUtilization(DatanodeStorageReport r, final StorageType t) { long capacity = 0L; long blockPoolUsed = 0L; for(StorageReport s : r.getStorageReports()) { if (s.getStorage().getStorageType() == t) { capacity += s.getCapacity(); blockPoolUsed += s.getBlockPoolUsed(); } } return capacity == 0L? null: blockPoolUsed*100.0/capacity; }
/** * Choose <i>numOfReplicas</i> nodes from the racks * that <i>localMachine</i> is NOT on. * if not enough nodes are available, choose the remaining ones * from the local rack */ protected void chooseRemoteRack(int numOfReplicas, DatanodeDescriptor localMachine, Set<Node> excludedNodes, long blocksize, int maxReplicasPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, EnumMap<StorageType, Integer> storageTypes) throws NotEnoughReplicasException { int oldNumOfReplicas = results.size(); // randomly choose one node from remote racks try { chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(), excludedNodes, blocksize, maxReplicasPerRack, results, avoidStaleNodes, storageTypes); } catch (NotEnoughReplicasException e) { if (LOG.isDebugEnabled()) { LOG.debug("Failed to choose remote rack (location = ~" + localMachine.getNetworkLocation() + "), fallback to local rack", e); } chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas), localMachine.getNetworkLocation(), excludedNodes, blocksize, maxReplicasPerRack, results, avoidStaleNodes, storageTypes); } }
/** * Choose a block & a proxy source for this pendingMove whose source & * target have already been chosen. * * @return true if a block and its proxy are chosen; false otherwise */ private boolean chooseBlockAndProxy() { // source and target must have the same storage type final StorageType t = source.getStorageType(); // iterate all source's blocks until find a good one for (Iterator<DBlock> i = source.getBlockIterator(); i.hasNext();) { if (markMovedIfGoodBlock(i.next(), t)) { i.remove(); return true; } } return false; }
/** * Randomly choose one target from the given <i>scope</i>. * @return the chosen storage, if there is any. */ protected DatanodeStorageInfo chooseRandom(String scope, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, EnumMap<StorageType, Integer> storageTypes) throws NotEnoughReplicasException { return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); }
@Test public void testUseDelHint() { DatanodeStorageInfo delHint = new DatanodeStorageInfo( DFSTestUtil.getLocalDatanodeDescriptor(), new DatanodeStorage("id")); List<DatanodeStorageInfo> moreThan1Racks = Arrays.asList(delHint); List<StorageType> excessTypes = new ArrayList<StorageType>(); excessTypes.add(StorageType.DEFAULT); Assert.assertTrue(BlockManager.useDelHint(true, delHint, null, moreThan1Racks, excessTypes)); excessTypes.remove(0); excessTypes.add(StorageType.SSD); Assert.assertFalse(BlockManager.useDelHint(true, delHint, null, moreThan1Racks, excessTypes)); }