/** * Write an array of blocks as compactly as possible. This uses * delta-encoding for the generation stamp and size, following * the principle that genstamp increases relatively slowly, * and size is equal for all but the last block of a file. */ public static void writeCompactBlockArray( Block[] blocks, DataOutputStream out) throws IOException { WritableUtils.writeVInt(out, blocks.length); Block prev = null; for (Block b : blocks) { long szDelta = b.getNumBytes() - (prev != null ? prev.getNumBytes() : 0); long gsDelta = b.getGenerationStamp() - (prev != null ? prev.getGenerationStamp() : 0); out.writeLong(b.getBlockId()); // blockid is random WritableUtils.writeVLong(out, szDelta); WritableUtils.writeVLong(out, gsDelta); prev = b; } }
/** Print the contents to out. */ synchronized void dump(final PrintWriter out) { final int size = node2blocks.values().size(); out.println("Metasave: Blocks " + numBlocks + " waiting deletion from " + size + " datanodes."); if (size == 0) { return; } for(Map.Entry<DatanodeInfo, LightWeightHashSet<Block>> entry : node2blocks.entrySet()) { final LightWeightHashSet<Block> blocks = entry.getValue(); if (blocks.size() > 0) { out.println(entry.getKey()); out.println(blocks); } } }
@Override public void writeFields(DataOutputStream out) throws IOException { FSImageSerialization.writeLong(inodeId, out); FSImageSerialization.writeString(path, out); FSImageSerialization.writeShort(replication, out); FSImageSerialization.writeLong(mtime, out); FSImageSerialization.writeLong(atime, out); FSImageSerialization.writeLong(blockSize, out); new ArrayWritable(Block.class, blocks).write(out); permissions.write(out); if (this.opCode == OP_ADD) { AclEditLogUtil.write(aclEntries, out); XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder(); b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs)); b.build().writeDelimitedTo(out); FSImageSerialization.writeString(clientName,out); FSImageSerialization.writeString(clientMachine,out); FSImageSerialization.writeBoolean(overwrite, out); FSImageSerialization.writeByte(storagePolicyId, out); // write clientId and callId writeRpcIds(rpcClientId, rpcCallId, out); } }
/** * Test that getInvalidateBlocks observes the maxlimit. */ @Test public void testGetInvalidateBlocks() throws Exception { final int MAX_BLOCKS = 10; final int REMAINING_BLOCKS = 2; final int MAX_LIMIT = MAX_BLOCKS - REMAINING_BLOCKS; DatanodeDescriptor dd = DFSTestUtil.getLocalDatanodeDescriptor(); ArrayList<Block> blockList = new ArrayList<Block>(MAX_BLOCKS); for (int i=0; i<MAX_BLOCKS; i++) { blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP)); } dd.addBlocksToBeInvalidated(blockList); Block[] bc = dd.getInvalidateBlocks(MAX_LIMIT); assertEquals(bc.length, MAX_LIMIT); bc = dd.getInvalidateBlocks(MAX_LIMIT); assertEquals(bc.length, REMAINING_BLOCKS); }
/** * Add a block to the block collection * which will be invalidated on the specified datanode. */ synchronized void add(final Block block, final DatanodeInfo datanode, final boolean log) { LightWeightHashSet<Block> set = node2blocks.get(datanode); if (set == null) { set = new LightWeightHashSet<Block>(); node2blocks.put(datanode, set); } if (set.add(block)) { numBlocks++; if (log) { NameNode.blockStateChangeLog.info("BLOCK* {}: add {} to {}", getClass().getSimpleName(), block, datanode); } } }
boolean unprotectedRemoveBlock(String path, INodesInPath iip, INodeFile fileNode, Block block) throws IOException { // modify file-> block and blocksMap // fileNode should be under construction boolean removed = fileNode.removeLastBlock(block); if (!removed) { return false; } getBlockManager().removeBlockFromMap(block); if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: " +path+" with "+block +" block is removed from the file system"); } // update space consumed updateCount(iip, 0, -fileNode.getPreferredBlockSize(), fileNode.getBlockReplication(), true); return true; }
/** * Remove a block from the under replication queues. * * The priLevel parameter is a hint of which queue to query * first: if negative or >= {@link #LEVEL} this shortcutting * is not attmpted. * * If the block is not found in the nominated queue, an attempt is made to * remove it from all queues. * * <i>Warning:</i> This is not a synchronized method. * @param block block to remove * @param priLevel expected privilege level * @return true if the block was found and removed from one of the priority queues */ boolean remove(Block block, int priLevel) { if(priLevel >= 0 && priLevel < LEVEL && priorityQueues.get(priLevel).remove(block)) { NameNode.blockStateChangeLog.debug( "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block {}" + " from priority queue {}", block, priLevel); return true; } else { // Try to remove the block from all queues if the block was // not found in the queue for the given priority level. for (int i = 0; i < LEVEL; i++) { if (priorityQueues.get(i).remove(block)) { NameNode.blockStateChangeLog.debug( "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block" + " {} from priority queue {}", block, priLevel); return true; } } } return false; }
/** * Find the meta-file for the specified block file * and then return the generation stamp from the name of the meta-file. */ static long getGenerationStampFromFile(File[] listdir, File blockFile) { String blockName = blockFile.getName(); for (int j = 0; j < listdir.length; j++) { String path = listdir[j].getName(); if (!path.startsWith(blockName)) { continue; } if (blockFile == listdir[j]) { continue; } return Block.getGenerationStamp(listdir[j].getName()); } FsDatasetImpl.LOG.warn("Block " + blockFile + " does not have a metafile!"); return GenerationStamp.GRANDFATHER_GENERATION_STAMP; }
@Override // FsDatasetSpi public synchronized ReplicaHandler recoverRbw( ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); if ( binfo == null) { throw new ReplicaNotFoundException("Block " + b + " does not exist, and cannot be appended to."); } if (binfo.isFinalized()) { throw new ReplicaAlreadyExistsException("Block " + b + " is valid, and cannot be written to."); } map.remove(b); binfo.theBlock.setGenerationStamp(newGS); map.put(binfo.theBlock, binfo); return new ReplicaHandler(binfo, null); }
private void save(OutputStream out, INodeFile n) throws IOException { INodeSection.INodeFile.Builder b = buildINodeFile(n, parent.getSaverContext()); if (n.getBlocks() != null) { for (Block block : n.getBlocks()) { b.addBlocks(PBHelper.convert(block)); } } FileUnderConstructionFeature uc = n.getFileUnderConstructionFeature(); if (uc != null) { INodeSection.FileUnderConstructionFeature f = INodeSection.FileUnderConstructionFeature .newBuilder().setClientName(uc.getClientName()) .setClientMachine(uc.getClientMachine()).build(); b.setFileUC(f); } INodeSection.INode r = buildINodeCommon(n) .setType(INodeSection.INode.Type.FILE).setFile(b).build(); r.writeDelimitedTo(out); }
/** * Test that the block type (legacy or not) can be correctly detected * based on its generation stamp. * * @throws IOException */ @Test public void testBlockTypeDetection() throws IOException { // Setup a mock object and stub out a few routines to // retrieve the generation stamp counters. BlockIdManager bid = mock(BlockIdManager.class); final long maxGenStampForLegacyBlocks = 10000; when(bid.getGenerationStampV1Limit()) .thenReturn(maxGenStampForLegacyBlocks); Block legacyBlock = spy(new Block()); when(legacyBlock.getGenerationStamp()) .thenReturn(maxGenStampForLegacyBlocks/2); Block newBlock = spy(new Block()); when(newBlock.getGenerationStamp()) .thenReturn(maxGenStampForLegacyBlocks+1); // Make sure that isLegacyBlock() can correctly detect // legacy and new blocks. when(bid.isLegacyBlock(any(Block.class))).thenCallRealMethod(); assertThat(bid.isLegacyBlock(legacyBlock), is(true)); assertThat(bid.isLegacyBlock(newBlock), is(false)); }
/** * Iterate through all items and print them. */ void metaSave(PrintWriter out) { synchronized (pendingReplications) { out.println("Metasave: Blocks being replicated: " + pendingReplications.size()); Iterator<Map.Entry<Block, PendingBlockInfo>> iter = pendingReplications.entrySet().iterator(); while (iter.hasNext()) { Map.Entry<Block, PendingBlockInfo> entry = iter.next(); PendingBlockInfo pendingBlock = entry.getValue(); Block block = entry.getKey(); out.println(block + " StartTime: " + new Time(pendingBlock.timeStamp) + " NumReplicaInProgress: " + pendingBlock.getNumReplicas()); } } }
@Test public void testCommitBlockSynchronizationWithCloseAndNonExistantTarget() throws IOException { INodeFile file = mockFileUnderConstruction(); Block block = new Block(blockId, length, genStamp); FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); DatanodeID[] newTargets = new DatanodeID[]{ new DatanodeID("0.0.0.0", "nonexistantHost", "1", 0, 0, 0, 0)}; ExtendedBlock lastBlock = new ExtendedBlock(); namesystemSpy.commitBlockSynchronization( lastBlock, genStamp, length, true, false, newTargets, null); // Repeat the call to make sure it returns true namesystemSpy.commitBlockSynchronization( lastBlock, genStamp, length, true, false, newTargets, null); }
/** * Add a new block into the given INodeFile */ private void addNewBlock(FSDirectory fsDir, AddBlockOp op, INodeFile file) throws IOException { BlockInfoContiguous[] oldBlocks = file.getBlocks(); Block pBlock = op.getPenultimateBlock(); Block newBlock= op.getLastBlock(); if (pBlock != null) { // the penultimate block is not null Preconditions.checkState(oldBlocks != null && oldBlocks.length > 0); // compare pBlock with the last block of oldBlocks Block oldLastBlock = oldBlocks[oldBlocks.length - 1]; if (oldLastBlock.getBlockId() != pBlock.getBlockId() || oldLastBlock.getGenerationStamp() != pBlock.getGenerationStamp()) { throw new IOException( "Mismatched block IDs or generation stamps for the old last block of file " + op.getPath() + ", the old last block is " + oldLastBlock + ", and the block read from editlog is " + pBlock); } oldLastBlock.setNumBytes(pBlock.getNumBytes()); if (oldLastBlock instanceof BlockInfoContiguousUnderConstruction) { fsNamesys.getBlockManager().forceCompleteBlock(file, (BlockInfoContiguousUnderConstruction) oldLastBlock); fsNamesys.getBlockManager().processQueuedMessagesForBlock(pBlock); } } else { // the penultimate block is null Preconditions.checkState(oldBlocks == null || oldBlocks.length == 0); } // add the new block BlockInfoContiguous newBI = new BlockInfoContiguousUnderConstruction( newBlock, file.getBlockReplication()); fsNamesys.getBlockManager().addBlockCollection(newBI, file); file.addBlock(newBI); fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock); }
@Override // NumberGenerator public long nextValue() { Block b = new Block(super.nextValue()); // There may be an occasional conflict with randomly generated // block IDs. Skip over the conflicts. while(isValidBlock(b)) { b.setBlockId(super.nextValue()); } return b.getBlockId(); }
/** * Test for ReplicasMap.get(Block) and ReplicasMap.get(long) tests */ @Test public void testGet() { // Test 1: null argument throws invalid argument exception try { map.get(bpid, null); fail("Expected exception not thrown"); } catch (IllegalArgumentException expected) { } // Test 2: successful lookup based on block assertNotNull(map.get(bpid, block)); // Test 3: Lookup failure - generation stamp mismatch Block b = new Block(block); b.setGenerationStamp(0); assertNull(map.get(bpid, b)); // Test 4: Lookup failure - blockID mismatch b.setGenerationStamp(block.getGenerationStamp()); b.setBlockId(0); assertNull(map.get(bpid, b)); // Test 5: successful lookup based on block ID assertNotNull(map.get(bpid, block.getBlockId())); // Test 6: failed lookup for invalid block ID assertNull(map.get(bpid, 0)); }
/** * Test if {@link BlockManager#computeInvalidateWork(int)} * can schedule invalidate work correctly */ @Test(timeout=120000) public void testCompInvalidate() throws Exception { final int blockInvalidateLimit = bm.getDatanodeManager() .blockInvalidateLimit; namesystem.writeLock(); try { for (int i=0; i<nodes.length; i++) { for(int j=0; j<3*blockInvalidateLimit+1; j++) { Block block = new Block(i*(blockInvalidateLimit+1)+j, 0, GenerationStamp.LAST_RESERVED_STAMP); bm.addToInvalidates(block, nodes[i]); } } assertEquals(blockInvalidateLimit*NUM_OF_DATANODES, bm.computeInvalidateWork(NUM_OF_DATANODES+1)); assertEquals(blockInvalidateLimit*NUM_OF_DATANODES, bm.computeInvalidateWork(NUM_OF_DATANODES)); assertEquals(blockInvalidateLimit*(NUM_OF_DATANODES-1), bm.computeInvalidateWork(NUM_OF_DATANODES-1)); int workCount = bm.computeInvalidateWork(1); if (workCount == 1) { assertEquals(blockInvalidateLimit+1, bm.computeInvalidateWork(2)); } else { assertEquals(workCount, blockInvalidateLimit); assertEquals(2, bm.computeInvalidateWork(2)); } } finally { namesystem.writeUnlock(); } }
/** * Get blocks to invalidate for <i>nodeId</i> * in {@link #invalidateBlocks}. * * @return number of blocks scheduled for removal during this iteration. */ private int invalidateWorkForOneNode(DatanodeInfo dn) { final List<Block> toInvalidate; namesystem.writeLock(); try { // blocks should not be replicated or removed if safe mode is on if (namesystem.isInSafeMode()) { LOG.debug("In safemode, not computing replication work"); return 0; } try { DatanodeDescriptor dnDescriptor = datanodeManager.getDatanode(dn); if (dnDescriptor == null) { LOG.warn("DataNode " + dn + " cannot be found with UUID " + dn.getDatanodeUuid() + ", removing block invalidation work."); invalidateBlocks.remove(dn); return 0; } toInvalidate = invalidateBlocks.invalidateWork(dnDescriptor); if (toInvalidate == null) { return 0; } } catch(UnregisteredNodeException une) { return 0; } } finally { namesystem.writeUnlock(); } blockLog.info("BLOCK* {}: ask {} to delete {}", getClass().getSimpleName(), dn, toInvalidate); return toInvalidate.size(); }
@Override public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) throws IOException { ExtendedBlock b = rBlock.getBlock(); final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { throw new IOException("No such Block " + b ); } return new ReplicaRecoveryInfo(binfo.getBlockId(), binfo.getBytesOnDisk(), binfo.getGenerationStamp(), binfo.isFinalized()?ReplicaState.FINALIZED : ReplicaState.RBW); }
/** Remove the block from the specified storage. */ synchronized void remove(final DatanodeInfo dn, final Block block) { final LightWeightHashSet<Block> v = node2blocks.get(dn); if (v != null && v.remove(block)) { numBlocks--; if (v.isEmpty()) { node2blocks.remove(dn); } } }
@Test public void testConvertBlock() { Block b = new Block(1, 100, 3); BlockProto bProto = PBHelper.convert(b); Block b2 = PBHelper.convert(bProto); assertEquals(b, b2); }
/** * simulate failure delete all the block files * @param dir * @throws IOException */ private boolean deteteBlocks(File dir) { File [] fileList = dir.listFiles(); for(File f : fileList) { if(f.getName().startsWith(Block.BLOCK_FILE_PREFIX)) { if(!f.delete()) return false; } } return true; }
ReplicaUnderConstruction(Block block, DatanodeStorageInfo target, ReplicaState state) { super(block); this.expectedLocation = target; this.state = state; this.chosenAsPrimary = false; }
@Override protected void toXml(ContentHandler contentHandler) throws SAXException { XMLUtils.addSaxString(contentHandler, "PATH", path); for (Block b : blocks) { FSEditLogOp.blockToXml(contentHandler, b); } appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); }
/** * Get the meta information of the replica that matches both block id * and generation stamp * @param bpid block pool id * @param block block with its id as the key * @return the replica's meta information * @throws IllegalArgumentException if the input block or block pool is null */ ReplicaInfo get(String bpid, Block block) { checkBlockPool(bpid); checkBlock(block); ReplicaInfo replicaInfo = get(bpid, block.getBlockId()); if (replicaInfo != null && block.getGenerationStamp() == replicaInfo.getGenerationStamp()) { return replicaInfo; } return null; }
public static Block blockFromXml(Stanza st) throws InvalidXmlException { long blockId = Long.parseLong(st.getValue("BLOCK_ID")); long numBytes = Long.parseLong(st.getValue("NUM_BYTES")); long generationStamp = Long.parseLong(st.getValue("GENSTAMP")); return new Block(blockId, numBytes, generationStamp); }
@Override // FsDatasetSpi public Block getStoredBlock(String bpid, long blkid) throws IOException { final Map<Block, BInfo> map = blockMap.get(bpid); if (map != null) { BInfo binfo = map.get(new Block(blkid)); if (binfo == null) { return null; } return new Block(blkid, binfo.getGenerationStamp(), binfo.getNumBytes()); } return null; }
/** * Copy specified file into a temporary file. Then rename the * temporary file to the original name. This will cause any * hardlinks to the original file to be removed. The temporary * files are created in the same directory. The temporary files will * be recovered (especially on Windows) on datanode restart. */ private void unlinkFile(File file, Block b) throws IOException { File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file)); try { FileInputStream in = new FileInputStream(file); try { FileOutputStream out = new FileOutputStream(tmpFile); try { IOUtils.copyBytes(in, out, 16*1024); } finally { out.close(); } } finally { in.close(); } if (file.length() != tmpFile.length()) { throw new IOException("Copy of file " + file + " size " + file.length()+ " into file " + tmpFile + " resulted in a size of " + tmpFile.length()); } FileUtil.replaceFile(tmpFile, file); } catch (IOException e) { boolean done = tmpFile.delete(); if (!done) { DataNode.LOG.info("detachFile failed to delete temporary file " + tmpFile); } throw e; } }
/** * Return the number of nodes hosting a given block, grouped * by the state of those replicas. */ public NumberReplicas countNodes(Block b) { int decommissioned = 0; int live = 0; int corrupt = 0; int excess = 0; int stale = 0; Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b); for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) { corrupt++; } else if (node.isDecommissionInProgress() || node.isDecommissioned()) { decommissioned++; } else { LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node .getDatanodeUuid()); if (blocksExcess != null && blocksExcess.contains(b)) { excess++; } else { live++; } } if (storage.areBlockContentsStale()) { stale++; } } return new NumberReplicas(live, decommissioned, corrupt, excess, stale); }
@Test(timeout=300000) public void blockReport_09() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; final int bytesChkSum = 1024 * 1000; conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytesChkSum); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 6 * bytesChkSum); shutDownCluster(); startUpCluster(); // write file and start second node to be "older" than the original try { writeFile(METHOD_NAME, 12 * bytesChkSum, filePath); Block bl = findBlock(filePath, 12 * bytesChkSum); BlockChecker bc = new BlockChecker(filePath); bc.start(); waitForTempReplica(bl, DN_N1); // all blocks belong to the same file, hence same BP DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, true, true); sendBlockReports(dnR, poolId, reports); printStats(); assertEquals("Wrong number of PendingReplication blocks", 2, cluster.getNamesystem().getPendingReplicationBlocks()); try { bc.join(); } catch (InterruptedException e) {} } finally { resetConfiguration(); // return the initial state of the configuration } }
@Test public void testCommitBlockSynchronization() throws IOException { INodeFile file = mockFileUnderConstruction(); Block block = new Block(blockId, length, genStamp); FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); DatanodeID[] newTargets = new DatanodeID[0]; ExtendedBlock lastBlock = new ExtendedBlock(); namesystemSpy.commitBlockSynchronization( lastBlock, genStamp, length, false, false, newTargets, null); // Repeat the call to make sure it does not throw namesystemSpy.commitBlockSynchronization( lastBlock, genStamp, length, false, false, newTargets, null); // Simulate 'completing' the block. BlockInfoContiguous completedBlockInfo = new BlockInfoContiguous(block, (short) 1); completedBlockInfo.setBlockCollection(file); completedBlockInfo.setGenerationStamp(genStamp); doReturn(completedBlockInfo).when(namesystemSpy) .getStoredBlock(any(Block.class)); doReturn(completedBlockInfo).when(file).getLastBlock(); // Repeat the call to make sure it does not throw namesystemSpy.commitBlockSynchronization( lastBlock, genStamp, length, false, false, newTargets, null); }
boolean blockHasEnoughRacks(Block b) { if (!this.shouldCheckForEnoughRacks) { return true; } boolean enoughRacks = false;; Collection<DatanodeDescriptor> corruptNodes = corruptReplicas.getNodes(b); int numExpectedReplicas = getReplication(b); String rackName = null; for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { if ((corruptNodes == null ) || !corruptNodes.contains(cur)) { if (numExpectedReplicas == 1 || (numExpectedReplicas > 1 && !datanodeManager.hasClusterEverBeenMultiRack())) { enoughRacks = true; break; } String rackNameNew = cur.getNetworkLocation(); if (rackName == null) { rackName = rackNameNew; } else if (!rackName.equals(rackNameNew)) { enoughRacks = true; break; } } } } return enoughRacks; }
public static ExtendedBlock getBlock(String bpid, File dataDir) { List<File> metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles(dataDir); if (metadataFiles == null || metadataFiles.isEmpty()) { return null; } File metadataFile = metadataFiles.get(0); File blockFile = Block.metaToBlockFile(metadataFile); return new ExtendedBlock(bpid, Block.getBlockId(blockFile.getName()), blockFile.length(), Block.getGenerationStamp(metadataFile.getName())); }
/** * Transfer a replica to the datanode targets. * @param b the block to transfer. * The corresponding replica must be an RBW or a Finalized. * Its GS and numBytes will be set to * the stored GS and the visible length. * @param targets targets to transfer the block to * @param client client name */ void transferReplicaForPipelineRecovery(final ExtendedBlock b, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final String client) throws IOException { final long storedGS; final long visible; final BlockConstructionStage stage; //get replica information synchronized(data) { Block storedBlock = data.getStoredBlock(b.getBlockPoolId(), b.getBlockId()); if (null == storedBlock) { throw new IOException(b + " not found in datanode."); } storedGS = storedBlock.getGenerationStamp(); if (storedGS < b.getGenerationStamp()) { throw new IOException(storedGS + " = storedGS < b.getGenerationStamp(), b=" + b); } // Update the genstamp with storedGS b.setGenerationStamp(storedGS); if (data.isValidRbw(b)) { stage = BlockConstructionStage.TRANSFER_RBW; } else if (data.isValidBlock(b)) { stage = BlockConstructionStage.TRANSFER_FINALIZED; } else { final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId()); throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r); } visible = data.getReplicaVisibleLength(b); } //set visible length b.setNumBytes(visible); if (targets.length > 0) { new DataTransfer(targets, targetStorageTypes, b, stage, client).run(); } }
/** * From the given list, incrementally remove the blocks from blockManager * Writelock is dropped and reacquired every BLOCK_DELETION_INCREMENT to * ensure that other waiters on the lock can get in. See HDFS-2938 * * @param blocks * An instance of {@link BlocksMapUpdateInfo} which contains a list * of blocks that need to be removed from blocksMap */ void removeBlocks(BlocksMapUpdateInfo blocks) { List<Block> toDeleteList = blocks.getToDeleteList(); Iterator<Block> iter = toDeleteList.iterator(); while (iter.hasNext()) { writeLock(); try { for (int i = 0; i < BLOCK_DELETION_INCREMENT && iter.hasNext(); i++) { blockManager.removeBlock(iter.next()); } } finally { writeUnlock(); } } }
@Override @Deprecated public Replica getReplica(String bpid, long blockId) { final Map<Block, BInfo> map = blockMap.get(bpid); if (map != null) { return map.get(new Block(blockId)); } return null; }
@Override public void decrementSafeBlockCount(Block b) { // safeMode is volatile, and may be set to null at any time SafeModeInfo safeMode = this.safeMode; if (safeMode == null) // mostly true return; BlockInfoContiguous storedBlock = getStoredBlock(b); if (storedBlock.isComplete()) { safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas()); } }
@Override // FsDatasetSpi public synchronized long getLength(ExtendedBlock b) throws IOException { final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { throw new IOException("Finalizing a non existing block " + b); } return binfo.getNumBytes(); }
/** * Get Nodes which have corrupt replicas of Block * * @param blk Block for which nodes are requested * @return collection of nodes. Null if does not exists */ Collection<DatanodeDescriptor> getNodes(Block blk) { Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk); if (nodes == null) return null; return nodes.keySet(); }
/** * Add a block to the list of pending Replications * @param block The corresponding block * @param targets The DataNodes where replicas of the block should be placed */ void increment(Block block, DatanodeDescriptor[] targets) { synchronized (pendingReplications) { PendingBlockInfo found = pendingReplications.get(block); if (found == null) { pendingReplications.put(block, new PendingBlockInfo(targets)); } else { found.incrementReplicas(targets); found.setTimeStamp(); } } }