/** * go to each block on the 2nd DataNode until it fails... * @param path * @param size * @throws IOException */ private void triggerFailure(String path, long size) throws IOException { NamenodeProtocols nn = cluster.getNameNodeRpc(); List<LocatedBlock> locatedBlocks = nn.getBlockLocations(path, 0, size).getLocatedBlocks(); for (LocatedBlock lb : locatedBlocks) { DatanodeInfo dinfo = lb.getLocations()[1]; ExtendedBlock b = lb.getBlock(); try { accessBlock(dinfo, lb); } catch (IOException e) { System.out.println("Failure triggered, on block: " + b.getBlockId() + "; corresponding volume should be removed by now"); break; } } }
private LocatedBlock createLocatedBlock() { DatanodeInfo[] dnInfos = { DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h1", AdminStates.DECOMMISSION_INPROGRESS), DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h2", AdminStates.DECOMMISSIONED), DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3", AdminStates.NORMAL), DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h4", AdminStates.NORMAL), }; String[] storageIDs = {"s1", "s2", "s3", "s4"}; StorageType[] media = { StorageType.DISK, StorageType.SSD, StorageType.DISK, StorageType.RAM_DISK }; LocatedBlock lb = new LocatedBlock( new ExtendedBlock("bp12", 12345, 10, 53), dnInfos, storageIDs, media, 5, false, new DatanodeInfo[]{}); lb.setBlockToken(new Token<BlockTokenIdentifier>( "identifier".getBytes(), "password".getBytes(), new Text("kind"), new Text("service"))); return lb; }
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode, final LocatedBlock block, final long start, final long end, final ByteBuffer bb, final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, final int hedgedReadId) { final Span parentSpan = Trace.currentSpan(); return new Callable<ByteBuffer>() { @Override public ByteBuffer call() throws Exception { byte[] buf = bb.array(); int offset = bb.position(); TraceScope scope = Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan); try { actualGetFromOneDataNode(datanode, block, start, end, buf, offset, corruptedBlockMap); return bb; } finally { scope.close(); } } }; }
/** * DFSInputStream reports checksum failure. * Case I : client has tried multiple data nodes and at least one of the * attempts has succeeded. We report the other failures as corrupted block to * namenode. * Case II: client has tried out all data nodes, but all failed. We * only report if the total number of replica is 1. We do not * report otherwise since this maybe due to the client is a handicapped client * (who can not read). * @param corruptedBlockMap map of corrupted blocks * @param dataNodeCount number of data nodes who contains the block replicas */ private void reportCheckSumFailure( Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, int dataNodeCount) { if (corruptedBlockMap.isEmpty()) { return; } Iterator<Entry<ExtendedBlock, Set<DatanodeInfo>>> it = corruptedBlockMap .entrySet().iterator(); Entry<ExtendedBlock, Set<DatanodeInfo>> entry = it.next(); ExtendedBlock blk = entry.getKey(); Set<DatanodeInfo> dnSet = entry.getValue(); if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0)) || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) { DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()]; int i = 0; for (DatanodeInfo dn:dnSet) { locs[i++] = dn; } LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) }; dfsClient.reportChecksumFailure(src, lblocks); } corruptedBlockMap.clear(); }
@Override public LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes) throws AccessControlException, FileNotFoundException, NotReplicatedYetException, SafeModeException, UnresolvedLinkException, IOException { AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder() .setSrc(src).setClientName(clientName).setFileId(fileId); if (previous != null) req.setPrevious(PBHelper.convert(previous)); if (excludeNodes != null) req.addAllExcludeNodes(PBHelper.convert(excludeNodes)); if (favoredNodes != null) { req.addAllFavoredNodes(Arrays.asList(favoredNodes)); } try { return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
/** Check if access should be allowed. userID is not checked if null */ public void checkAccess(Token<BlockTokenIdentifier> token, String userId, ExtendedBlock block, AccessMode mode) throws InvalidToken { BlockTokenIdentifier id = new BlockTokenIdentifier(); try { id.readFields(new DataInputStream(new ByteArrayInputStream(token .getIdentifier()))); } catch (IOException e) { throw new InvalidToken( "Unable to de-serialize block token identifier for user=" + userId + ", block=" + block + ", access mode=" + mode); } checkAccess(id, userId, block, mode); if (!Arrays.equals(retrievePassword(id), token.getPassword())) { throw new InvalidToken("Block token with " + id.toString() + " doesn't have the correct token password"); } }
LocalDatanodeInfo() { final int cacheSize = 10000; final float hashTableLoadFactor = 0.75f; int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1; cache = Collections .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>( hashTableCapacity, hashTableLoadFactor, true) { private static final long serialVersionUID = 1; @Override protected boolean removeEldestEntry( Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) { return size() > cacheSize; } }); }
@Test public void testAppend() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); try { cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); // set up replicasMap String bpid = cluster.getNamesystem().getBlockPoolId(); ExtendedBlock[] blocks = setup(bpid, dataSet); // test append testAppend(bpid, dataSet, blocks); } finally { cluster.shutdown(); } }
@Override // FsDatasetSpi public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) throws IOException { synchronized(this) { final Replica replica = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); if (replica == null) { throw new ReplicaNotFoundException(block); } if (replica.getGenerationStamp() < block.getGenerationStamp()) { throw new IOException( "Replica generation stamp < block generation stamp, block=" + block + ", replica=" + replica); } else if (replica.getGenerationStamp() > block.getGenerationStamp()) { block.setGenerationStamp(replica.getGenerationStamp()); } } File datafile = getBlockFile(block); File metafile = FsDatasetUtil.getMetaFile(datafile, block.getGenerationStamp()); BlockLocalPathInfo info = new BlockLocalPathInfo(block, datafile.getAbsolutePath(), metafile.getAbsolutePath()); return info; }
private void writeZeroLengthPacket(ExtendedBlock block, String description) throws IOException { PacketHeader hdr = new PacketHeader( 8, // size of packet block.getNumBytes(), // OffsetInBlock 100, // sequencenumber true, // lastPacketInBlock 0, // chunk length false); // sync block hdr.write(sendOut); sendOut.writeInt(0); // zero checksum //ok finally write a block with 0 len sendResponse(Status.SUCCESS, "", null, recvOut); new PipelineAck(100, new int[] {PipelineAck.combineHeader (PipelineAck.ECN.DISABLED, Status.SUCCESS)}).write (recvOut); sendRecvData(description, false); }
private static ExtendedBlock[][] generateBlocks(Suite s, long size ) throws IOException, InterruptedException, TimeoutException { final ExtendedBlock[][] blocks = new ExtendedBlock[s.clients.length][]; for(int n = 0; n < s.clients.length; n++) { final long fileLen = size/s.replication; createFile(s, n, fileLen); final List<LocatedBlock> locatedBlocks = s.clients[n].getBlockLocations( FILE_NAME, 0, fileLen).getLocatedBlocks(); final int numOfBlocks = locatedBlocks.size(); blocks[n] = new ExtendedBlock[numOfBlocks]; for(int i = 0; i < numOfBlocks; i++) { final ExtendedBlock b = locatedBlocks.get(i).getBlock(); blocks[n][i] = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), b.getNumBytes(), b.getGenerationStamp()); } } return blocks; }
/** Sync two replicas */ private void testSyncReplicas(ReplicaRecoveryInfo replica1, ReplicaRecoveryInfo replica2, InterDatanodeProtocol dn1, InterDatanodeProtocol dn2, long expectLen) throws IOException { DatanodeInfo[] locs = new DatanodeInfo[]{ mock(DatanodeInfo.class), mock(DatanodeInfo.class)}; RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID); ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>(2); BlockRecord record1 = new BlockRecord( DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn1, replica1); BlockRecord record2 = new BlockRecord( DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn2, replica2); syncList.add(record1); syncList.add(record2); when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), anyLong(), anyLong())).thenReturn("storage1"); when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), anyLong(), anyLong())).thenReturn("storage2"); dn.syncBlock(rBlock, syncList); }
/** * The client would like to obtain an additional block for the indicated * filename (which is being written-to). Return an array that consists * of the block, plus a set of machines. The first on this list should * be where the client writes data. Subsequent items in the list must * be provided in the connection to the first datanode. * * Make sure the previous blocks have been reported by datanodes and * are replicated. Will return an empty 2-elt array if we want the * client to "try again later". */ LocatedBlock getAdditionalBlock(String src, long fileId, String clientName, ExtendedBlock previous, Set<Node> excludedNodes, List<String> favoredNodes) throws IOException { LocatedBlock[] onRetryBlock = new LocatedBlock[1]; DatanodeStorageInfo targets[] = getNewBlockTargets(src, fileId, clientName, previous, excludedNodes, favoredNodes, onRetryBlock); if (targets == null) { assert onRetryBlock[0] != null : "Retry block is null"; // This is a retry. Just return the last block. return onRetryBlock[0]; } LocatedBlock newBlock = storeAllocatedBlock( src, fileId, clientName, previous, targets); return newBlock; }
private ReceivedDeletedBlockInfo[] waitForBlockReceived( final ExtendedBlock fakeBlock, final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception { final String fakeBlockPoolId = fakeBlock.getBlockPoolId(); final ArgumentCaptor<StorageReceivedDeletedBlocks[]> captor = ArgumentCaptor.forClass(StorageReceivedDeletedBlocks[].class); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { try { Mockito.verify(mockNN).blockReceivedAndDeleted( Mockito.<DatanodeRegistration>anyObject(), Mockito.eq(fakeBlockPoolId), captor.capture()); return true; } catch (Throwable t) { return false; } } }, 100, 10000); return captor.getValue()[0].getBlocks(); }
@Override public ReplicaInPipelineInterface convertTemporaryToRbw(ExtendedBlock temporary) throws IOException { final Map<Block, BInfo> map = blockMap.get(temporary.getBlockPoolId()); if (map == null) { throw new IOException("Block pool not found, temporary=" + temporary); } final BInfo r = map.get(temporary.getLocalBlock()); if (r == null) { throw new IOException("Block not found, temporary=" + temporary); } else if (r.isFinalized()) { throw new IOException("Replica already finalized, temporary=" + temporary + ", r=" + r); } return r; }
/** * Mark the block belonging to datanode as corrupt * @param blk Block to be marked as corrupt * @param dn Datanode which holds the corrupt replica * @param storageID if known, null otherwise. * @param reason a textual reason why the block should be marked corrupt, * for logging purposes */ public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk, final DatanodeInfo dn, String storageID, String reason) throws IOException { assert namesystem.hasWriteLock(); final BlockInfoContiguous storedBlock = getStoredBlock(blk.getLocalBlock()); if (storedBlock == null) { // Check if the replica is in the blockMap, if not // ignore the request for now. This could happen when BlockScanner // thread of Datanode reports bad block before Block reports are sent // by the Datanode on startup blockLog.info("BLOCK* findAndMarkBlockAsCorrupt: {} not found", blk); return; } DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); if (node == null) { throw new IOException("Cannot mark " + blk + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid() + ") does not exist"); } markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED), storageID == null ? null : node.getStorageInfo(storageID), node); }
@Test public void testCommitBlockSynchronizationWithCloseAndNonExistantTarget() throws IOException { INodeFile file = mockFileUnderConstruction(); Block block = new Block(blockId, length, genStamp); FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); DatanodeID[] newTargets = new DatanodeID[]{ new DatanodeID("0.0.0.0", "nonexistantHost", "1", 0, 0, 0, 0)}; ExtendedBlock lastBlock = new ExtendedBlock(); namesystemSpy.commitBlockSynchronization( lastBlock, genStamp, length, true, false, newTargets, null); // Repeat the call to make sure it returns true namesystemSpy.commitBlockSynchronization( lastBlock, genStamp, length, true, false, newTargets, null); }
/** * Mark a block as "suspect." * * This means that we should try to rescan it soon. Note that the * VolumeScanner keeps a list of recently suspicious blocks, which * it uses to avoid rescanning the same block over and over in a short * time frame. * * @param storageId The ID of the storage where the block replica * is being stored. * @param block The block's ID and block pool id. */ synchronized void markSuspectBlock(String storageId, ExtendedBlock block) { if (!isEnabled()) { LOG.info("Not scanning suspicious block {} on {}, because the block " + "scanner is disabled.", block, storageId); return; } VolumeScanner scanner = scanners.get(storageId); if (scanner == null) { // This could happen if the volume is in the process of being removed. // The removal process shuts down the VolumeScanner, but the volume // object stays around as long as there are references to it (which // should not be that long.) LOG.info("Not scanning suspicious block {} on {}, because there is no " + "volume scanner for that storageId.", block, storageId); return; } scanner.markSuspectBlock(block); }
/** * Test writes a file and closes it. * Block reported is generated with an extra block. * Block report is forced and the check for # of pendingdeletion * blocks is performed. * * @throws IOException in case of an error */ @Test(timeout=300000) public void blockReport_04() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); DFSTestUtil.createFile(fs, filePath, FILE_SIZE, REPL_FACTOR, rand.nextLong()); DataNode dn = cluster.getDataNodes().get(DN_N0); // all blocks belong to the same file, hence same BP String poolId = cluster.getNamesystem().getBlockPoolId(); // Create a bogus new block which will not be present on the namenode. ExtendedBlock b = new ExtendedBlock( poolId, rand.nextLong(), 1024L, rand.nextLong()); dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); sendBlockReports(dnR, poolId, reports); printStats(); assertThat("Wrong number of corrupt blocks", cluster.getNamesystem().getCorruptReplicaBlocks(), is(0L)); assertThat("Wrong number of PendingDeletion blocks", cluster.getNamesystem().getPendingDeletionBlocks(), is(1L)); }
private void waitForTrueReplication(final MiniDFSCluster cluster, final ExtendedBlock block, final int waitFor) throws Exception { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { try { return getTrueReplication(cluster, block) == waitFor; } catch (IOException e) { throw new RuntimeException(e); } } }, 500, 10000); }
private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm, ExtendedBlock block, EnumSet<BlockTokenSecretManager.AccessMode> accessModes) throws IOException { Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes); BlockTokenIdentifier id = sm.createIdentifier(); id.readFields(new DataInputStream(new ByteArrayInputStream(token .getIdentifier()))); return id; }
@Override // FsDatasetSpi public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, long newBlockId, long newlength) { // Caller does not care about the exact Storage UUID returned. return datanodeUuid; }
/** * Add corrupted block replica into map. */ private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { Set<DatanodeInfo> dnSet = null; if((corruptedBlockMap.containsKey(blk))) { dnSet = corruptedBlockMap.get(blk); }else { dnSet = new HashSet<DatanodeInfo>(); } if (!dnSet.contains(node)) { dnSet.add(node); corruptedBlockMap.put(blk, dnSet); } }
public void removeBlocks(MiniDFSCluster cluster) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException { for (int corruptIdx : blocksToCorrupt) { // Corrupt a block by deleting it ExtendedBlock block = dfsClient.getNamenode().getBlockLocations( name, blockSize * corruptIdx, Long.MAX_VALUE).get(0).getBlock(); for (int i = 0; i < numDataNodes; i++) { File blockFile = cluster.getBlockFile(i, block); if(blockFile != null && blockFile.exists()) { assertTrue(blockFile.delete()); } } } }
/** * Complete the block write! */ @Override // FsDatasetSpi public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { if (Thread.interrupted()) { // Don't allow data modifications from interrupted threads throw new IOException("Cannot finalize block from Interrupted Thread"); } ReplicaInfo replicaInfo = getReplicaInfo(b); if (replicaInfo.getState() == ReplicaState.FINALIZED) { // this is legal, when recovery happens on a file that has // been opened for append but never modified return; } finalizeReplica(b.getBlockPoolId(), replicaInfo); }
/** * Get the block data file for a block from a given datanode * @param dnIndex Index of the datanode to get block files for * @param block block for which corresponding files are needed */ public File getBlockFile(int dnIndex, ExtendedBlock block) { // Check for block file in the two storage directories of the datanode for (int i = 0; i <=1 ; i++) { File storageDir = getStorageDir(dnIndex, i); File blockFile = getBlockFile(storageDir, block); if (blockFile.exists()) { return blockFile; } } return null; }
/** Generate a block token for a specified user */ public Token<BlockTokenIdentifier> generateToken(String userId, ExtendedBlock block, EnumSet<AccessMode> modes) throws IOException { BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block .getBlockPoolId(), block.getBlockId(), modes); return new Token<BlockTokenIdentifier>(id, this); }
/** * Check if access should be allowed. userID is not checked if null. This * method doesn't check if token password is correct. It should be used only * when token password has already been verified (e.g., in the RPC layer). */ public void checkAccess(BlockTokenIdentifier id, String userId, ExtendedBlock block, AccessMode mode) throws InvalidToken { if (LOG.isDebugEnabled()) { LOG.debug("Checking access for user=" + userId + ", block=" + block + ", access mode=" + mode + " using " + id.toString()); } if (userId != null && !userId.equals(id.getUserId())) { throw new InvalidToken("Block token with " + id.toString() + " doesn't belong to user " + userId); } if (!id.getBlockPoolId().equals(block.getBlockPoolId())) { throw new InvalidToken("Block token with " + id.toString() + " doesn't apply to block " + block); } if (id.getBlockId() != block.getBlockId()) { throw new InvalidToken("Block token with " + id.toString() + " doesn't apply to block " + block); } if (isExpired(id.getExpiryDate())) { throw new InvalidToken("Block token with " + id.toString() + " is expired."); } if (!id.getAccessModes().contains(mode)) { throw new InvalidToken("Block token with " + id.toString() + " doesn't have " + mode + " permission"); } }
/** * BlockRecoveryFI_11. a replica's recovery id does not match new GS. * * @throws IOException in case of an error */ @Test public void testNotMatchedReplicaID() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } ReplicaInPipelineInterface replicaInfo = dn.data.createRbw( StorageType.DEFAULT, block, false).getReplica(); ReplicaOutputStreams streams = null; try { streams = replicaInfo.createStreams(true, DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); streams.getChecksumOut().write('a'); dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1)); try { dn.syncBlock(rBlock, initBlockRecords(dn)); fail("Sync should fail"); } catch (IOException e) { e.getMessage().startsWith("Cannot recover "); } DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID); verify(namenode, never()).commitBlockSynchronization( any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(), anyBoolean(), any(DatanodeID[].class), any(String[].class)); } finally { streams.close(); } }
/** * After a block becomes finalized, a datanode increases metric counter, * notifies namenode, and adds it to the block scanner * @param block block to close * @param delHint hint on which excess block to delete * @param storageUuid UUID of the storage where block is stored */ void closeBlock(ExtendedBlock block, String delHint, String storageUuid) { metrics.incrBlocksWritten(); BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); if(bpos != null) { bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid); } else { LOG.warn("Cannot find BPOfferService for reporting block received for bpid=" + block.getBlockPoolId()); } }
/** * The corrupt block has to be removed when the number of valid replicas * matches replication factor for the file. In this the above condition is * tested by reducing the replication factor * The test strategy : * Bring up Cluster with 3 DataNodes * Create a file of replication factor 3 * Corrupt one replica of a block of the file * Verify that there are still 2 good replicas and 1 corrupt replica * (corrupt replica should not be removed since number of good * replicas (2) is less than replication factor (3)) * Set the replication factor to 2 * Verify that the corrupt replica is removed. * (corrupt replica should not be removed since number of good * replicas (2) is equal to replication factor (2)) */ @Test public void testWhenDecreasingReplication() throws Exception { Configuration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2)); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); FileSystem fs = cluster.getFileSystem(); final FSNamesystem namesystem = cluster.getNamesystem(); try { final Path fileName = new Path("/foo1"); DFSTestUtil.createFile(fs, fileName, 2, (short) 3, 0L); DFSTestUtil.waitReplication(fs, fileName, (short) 3); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName); corruptBlock(cluster, fs, fileName, 0, block); DFSTestUtil.waitReplication(fs, fileName, (short) 2); assertEquals(2, countReplicas(namesystem, block).liveReplicas()); assertEquals(1, countReplicas(namesystem, block).corruptReplicas()); namesystem.setReplication(fileName.toString(), (short) 2); // wait for 3 seconds so that all block reports are processed. try { Thread.sleep(3000); } catch (InterruptedException ignored) { } assertEquals(2, countReplicas(namesystem, block).liveReplicas()); assertEquals(0, countReplicas(namesystem, block).corruptReplicas()); } finally { cluster.shutdown(); } }
private static void logRecoverBlock(String who, RecoveringBlock rb) { ExtendedBlock block = rb.getBlock(); DatanodeInfo[] targets = rb.getLocations(); LOG.info(who + " calls recoverBlock(" + block + ", targets=[" + Joiner.on(", ").join(targets) + "]" + ", newGenerationStamp=" + rb.getNewGenerationStamp() + ")"); }
/** * Sets the offset in the meta file so that the * last checksum will be overwritten. */ @Override // FsDatasetSpi public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams, int checksumSize) throws IOException { FileOutputStream file = (FileOutputStream)streams.getChecksumOut(); FileChannel channel = file.getChannel(); long oldPos = channel.position(); long newPos = oldPos - checksumSize; if (LOG.isDebugEnabled()) { LOG.debug("Changing meta file offset of block " + b + " from " + oldPos + " to " + newPos); } channel.position(newPos); }
@Test(timeout=120000) public void testFadviseAfterWriteThenRead() throws Exception { // start a cluster LOG.info("testFadviseAfterWriteThenRead"); tracker.clear(); Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = null; String TEST_PATH = "/test"; int TEST_PATH_LEN = MAX_TEST_FILE_LEN; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) .build(); cluster.waitActive(); FileSystem fs = cluster.getFileSystem(); // create new file createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, true); // verify that we dropped everything from the cache during file creation. ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations( TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock(); String fadvisedFileName = cluster.getBlockFile(0, block).getName(); Stats stats = tracker.getStats(fadvisedFileName); stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE); stats.clear(); // read file readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, true); // verify that we dropped everything from the cache. Assert.assertNotNull(stats); stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE); } finally { if (cluster != null) { cluster.shutdown(); } } }
/** * Transfer a replica to the datanode targets. * @param b the block to transfer. * The corresponding replica must be an RBW or a Finalized. * Its GS and numBytes will be set to * the stored GS and the visible length. * @param targets targets to transfer the block to * @param client client name */ void transferReplicaForPipelineRecovery(final ExtendedBlock b, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final String client) throws IOException { final long storedGS; final long visible; final BlockConstructionStage stage; //get replica information synchronized(data) { Block storedBlock = data.getStoredBlock(b.getBlockPoolId(), b.getBlockId()); if (null == storedBlock) { throw new IOException(b + " not found in datanode."); } storedGS = storedBlock.getGenerationStamp(); if (storedGS < b.getGenerationStamp()) { throw new IOException(storedGS + " = storedGS < b.getGenerationStamp(), b=" + b); } // Update the genstamp with storedGS b.setGenerationStamp(storedGS); if (data.isValidRbw(b)) { stage = BlockConstructionStage.TRANSFER_RBW; } else if (data.isValidBlock(b)) { stage = BlockConstructionStage.TRANSFER_FINALIZED; } else { final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId()); throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r); } visible = data.getReplicaVisibleLength(b); } //set visible length b.setNumBytes(visible); if (targets.length > 0) { new DataTransfer(targets, targetStorageTypes, b, stage, client).run(); } }
/** * Update a pipeline for a block under construction * * @param clientName the name of the client * @param oldBlock and old block * @param newBlock a new block with a new generation stamp and length * @param newNodes datanodes in the pipeline * @throws IOException if any error occurs */ void updatePipeline( String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs, boolean logRetryCache) throws IOException { checkOperation(OperationCategory.WRITE); LOG.info("updatePipeline(" + oldBlock.getLocalBlock() + ", newGS=" + newBlock.getGenerationStamp() + ", newLength=" + newBlock.getNumBytes() + ", newNodes=" + Arrays.asList(newNodes) + ", client=" + clientName + ")"); waitForLoadingFSImage(); writeLock(); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Pipeline not updated"); assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and " + oldBlock + " has different block identifier"; updatePipelineInternal(clientName, oldBlock, newBlock, newNodes, newStorageIDs, logRetryCache); } finally { writeUnlock(); } getEditLog().logSync(); LOG.info("updatePipeline(" + oldBlock.getLocalBlock() + " => " + newBlock.getLocalBlock() + ") success"); }
@Override // FsDatasetSpi public synchronized long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { final Replica replica = getReplicaInfo(block.getBlockPoolId(), block.getBlockId()); if (replica.getGenerationStamp() < block.getGenerationStamp()) { throw new IOException( "replica.getGenerationStamp() < block.getGenerationStamp(), block=" + block + ", replica=" + replica); } return replica.getVisibleLength(); }
/** * Corrupt a block on a data node. Replace the block file content with content * of 1, 2, ...BLOCK_SIZE. * * @param block * the ExtendedBlock to be corrupted * @param dn * the data node where the block needs to be corrupted * @throws FileNotFoundException * @throws IOException */ private static void corruptBlock(final ExtendedBlock block, final DataNode dn) throws FileNotFoundException, IOException { final File f = DataNodeTestUtils.getBlockFile( dn, block.getBlockPoolId(), block.getLocalBlock()); final RandomAccessFile raFile = new RandomAccessFile(f, "rw"); final byte[] bytes = new byte[(int) BLOCK_SIZE]; for (int i = 0; i < BLOCK_SIZE; i++) { bytes[i] = (byte) (i); } raFile.write(bytes); raFile.close(); }
@Test(timeout=120000) public void testCorruptBlockHandling() throws Exception { Configuration conf = new Configuration(); conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L); conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER, TestScanResultHandler.class.getName()); final TestContext ctx = new TestContext(conf, 1); final int NUM_EXPECTED_BLOCKS = 5; final int CORRUPT_INDEX = 3; ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 4); ExtendedBlock badBlock = ctx.getFileBlock(0, CORRUPT_INDEX); ctx.cluster.corruptBlockOnDataNodes(badBlock); final TestScanResultHandler.Info info = TestScanResultHandler.getInfo(ctx.volumes.get(0)); synchronized (info) { info.shouldRun = true; info.notify(); } GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { synchronized (info) { return info.blocksScanned == NUM_EXPECTED_BLOCKS; } } }, 3, 30000); synchronized (info) { assertTrue(info.badBlocks.contains(badBlock)); for (int i = 0; i < NUM_EXPECTED_BLOCKS; i++) { if (i != CORRUPT_INDEX) { ExtendedBlock block = ctx.getFileBlock(0, i); assertTrue(info.goodBlocks.contains(block)); } } } ctx.close(); }
/** * Return the File associated with a block, without first * checking that it exists. This should be used when the * next operation is going to open the file for read anyway, * and thus the exists check is redundant. * * @param touch if true then update the last access timestamp of the * block. Currently used for blocks on transient storage. */ private File getBlockFileNoExistsCheck(ExtendedBlock b, boolean touch) throws IOException { final File f; synchronized(this) { f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch); } if (f == null) { throw new IOException("Block " + b + " is not valid"); } return f; }