/** * Create a file with one block and corrupt some/all of the block replicas. */ private void createAFileWithCorruptedBlockReplicas(Path filePath, short repl, int corruptBlockCount) throws IOException, AccessControlException, FileNotFoundException, UnresolvedLinkException, InterruptedException, TimeoutException { DFSTestUtil.createFile(dfs, filePath, BLOCK_SIZE, repl, 0); DFSTestUtil.waitReplication(dfs, filePath, repl); // Locate the file blocks by asking name node final LocatedBlocks locatedblocks = dfs.dfs.getNamenode() .getBlockLocations(filePath.toString(), 0L, BLOCK_SIZE); Assert.assertEquals(repl, locatedblocks.get(0).getLocations().length); // The file only has one block LocatedBlock lblock = locatedblocks.get(0); DatanodeInfo[] datanodeinfos = lblock.getLocations(); ExtendedBlock block = lblock.getBlock(); // corrupt some /all of the block replicas for (int i = 0; i < corruptBlockCount; i++) { DatanodeInfo dninfo = datanodeinfos[i]; final DataNode dn = cluster.getDataNode(dninfo.getIpcPort()); corruptBlock(block, dn); LOG.debug("Corrupted block " + block.getBlockName() + " on data node " + dninfo); } }
/** * Get block location info about file * * getBlockLocations() returns a list of hostnames that store * data for a specific file region. It returns a set of hostnames * for every block within the indicated region. * * This function is very useful when writing code that considers * data-placement when performing operations. For example, the * MapReduce system tries to schedule tasks on the same machines * as the data-block the task processes. */ public BlockLocation[] getBlockLocations(String src, long start, long length) throws IOException, UnresolvedLinkException { TraceScope scope = getPathTraceScope("getBlockLocations", src); try { LocatedBlocks blocks = getLocatedBlocks(src, start, length); BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks); HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length]; for (int i = 0; i < locations.length; i++) { hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i)); } return hdfsLocations; } finally { scope.close(); } }
public static LocatedBlocksProto convert(LocatedBlocks lb) { if (lb == null) { return null; } LocatedBlocksProto.Builder builder = LocatedBlocksProto.newBuilder(); if (lb.getLastLocatedBlock() != null) { builder.setLastBlock(PBHelper.convert(lb.getLastLocatedBlock())); } if (lb.getFileEncryptionInfo() != null) { builder.setFileEncryptionInfo(convert(lb.getFileEncryptionInfo())); } return builder.setFileLength(lb.getFileLength()) .setUnderConstruction(lb.isUnderConstruction()) .addAllBlocks(PBHelper.convertLocatedBlock2(lb.getLocatedBlocks())) .setIsLastBlockComplete(lb.isLastBlockComplete()).build(); }
@Override public GetBlockLocationsResponseProto getBlockLocations( RpcController controller, GetBlockLocationsRequestProto req) throws ServiceException { try { LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(), req.getLength()); Builder builder = GetBlockLocationsResponseProto .newBuilder(); if (b != null) { builder.setLocations(PBHelper.convert(b)).build(); } return builder.build(); } catch (IOException e) { throw new ServiceException(e); } }
@Override public LocatedBlocks getBlockLocations(String src, long offset, long length) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException { GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto .newBuilder() .setSrc(src) .setOffset(offset) .setLength(length) .build(); try { GetBlockLocationsResponseProto resp = rpcProxy.getBlockLocations(null, req); return resp.hasLocations() ? PBHelper.convert(resp.getLocations()) : null; } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
public static void checkBlockRecovery(Path p, DistributedFileSystem dfs, int attempts, long sleepMs) throws IOException { boolean success = false; for(int i = 0; i < attempts; i++) { LocatedBlocks blocks = getLocatedBlocks(p, dfs); boolean noLastBlock = blocks.getLastLocatedBlock() == null; if(!blocks.isUnderConstruction() && (noLastBlock || blocks.isLastBlockComplete())) { success = true; break; } try { Thread.sleep(sleepMs); } catch (InterruptedException ignored) {} } assertThat("inode should complete in ~" + sleepMs * attempts + " ms.", success, is(true)); }
@Override void prepare() throws Exception { final Path filePath = new Path(file); DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0); // append to the file and leave the last block under construction out = this.client.append(file, BlockSize, EnumSet.of(CreateFlag.APPEND), null, null); byte[] appendContent = new byte[100]; new Random().nextBytes(appendContent); out.write(appendContent); ((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); LocatedBlocks blks = dfs.getClient() .getLocatedBlocks(file, BlockSize + 1); assertEquals(1, blks.getLocatedBlocks().size()); nodes = blks.get(0).getLocations(); oldBlock = blks.get(0).getBlock(); LocatedBlock newLbk = client.getNamenode().updateBlockForPipeline( oldBlock, client.getClientName()); newBlock = new ExtendedBlock(oldBlock.getBlockPoolId(), oldBlock.getBlockId(), oldBlock.getNumBytes(), newLbk.getBlock().getGenerationStamp()); }
protected final boolean verifyBlockDeletedFromDir(File dir, LocatedBlocks locatedBlocks) { for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { File targetDir = DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId()); File blockFile = new File(targetDir, lb.getBlock().getBlockName()); if (blockFile.exists()) { LOG.warn("blockFile: " + blockFile.getAbsolutePath() + " exists after deletion."); return false; } File metaFile = new File(targetDir, DatanodeUtil.getMetaName(lb.getBlock().getBlockName(), lb.getBlock().getGenerationStamp())); if (metaFile.exists()) { LOG.warn("metaFile: " + metaFile.getAbsolutePath() + " exists after deletion."); return false; } } return true; }
@Test public void testLazyPersistBlocksAreSaved() throws IOException, InterruptedException { startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); // Create a test file makeTestFile(path, BLOCK_SIZE * 10, true); LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK); // Sleep for a short time to allow the lazy writer thread to do its job Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000); LOG.info("Verifying copy was saved to lazyPersist/"); // Make sure that there is a saved copy of the replica on persistent // storage. ensureLazyPersistBlocksAreSaved(locatedBlocks); }
/** * Delete lazy-persist file that has not been persisted to disk. * Memory is freed up and file is gone. * @throws IOException */ @Test public void testDeleteBeforePersist() throws Exception { startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0)); Path path = new Path("/" + METHOD_NAME + ".dat"); makeTestFile(path, BLOCK_SIZE, true); LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK); // Delete before persist client.delete(path.toString(), false); Assert.assertFalse(fs.exists(path)); assertThat(verifyDeletedBlocks(locatedBlocks), is(true)); verifyRamDiskJMXMetric("RamDiskBlocksDeletedBeforeLazyPersisted", 1); }
/** * Delete lazy-persist file that has been persisted to disk * Both memory blocks and disk blocks are deleted. * @throws IOException * @throws InterruptedException */ @Test public void testDeleteAfterPersist() throws Exception { startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); makeTestFile(path, BLOCK_SIZE, true); LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK); // Sleep for a short time to allow the lazy writer thread to do its job Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000); // Delete after persist client.delete(path.toString(), false); Assert.assertFalse(fs.exists(path)); assertThat(verifyDeletedBlocks(locatedBlocks), is(true)); verifyRamDiskJMXMetric("RamDiskBlocksLazyPersisted", 1); verifyRamDiskJMXMetric("RamDiskBytesLazyPersisted", BLOCK_SIZE); }
private LocatedBlocks makeBadBlockList(LocatedBlocks goodBlockList) { LocatedBlock goodLocatedBlock = goodBlockList.get(0); LocatedBlock badLocatedBlock = new LocatedBlock( goodLocatedBlock.getBlock(), new DatanodeInfo[] { DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234) }, goodLocatedBlock.getStartOffset(), false); List<LocatedBlock> badBlocks = new ArrayList<LocatedBlock>(); badBlocks.add(badLocatedBlock); return new LocatedBlocks(goodBlockList.getFileLength(), false, badBlocks, null, true, null); }
private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, String localhost) throws Exception { // Multiple times as the order is random for (int i = 0; i < 10; i++) { LocatedBlocks l; // The NN gets the block list asynchronously, so we may need multiple tries to get the list final long max = System.currentTimeMillis() + 10000; boolean done; do { Assert.assertTrue("Can't get enouth replica.", System.currentTimeMillis() < max); l = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1); Assert.assertNotNull("Can't get block locations for " + src, l); Assert.assertNotNull(l.getLocatedBlocks()); Assert.assertTrue(l.getLocatedBlocks().size() > 0); done = true; for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) { done = (l.get(y).getLocations().length == repCount); } } while (!done); for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) { Assert.assertEquals(localhost, l.get(y).getLocations()[repCount - 1].getHostName()); } } }
/** Convert a Json map to LocatedBlock. */ static LocatedBlocks toLocatedBlocks( final Map<?, ?> json) throws IOException { if (json == null) { return null; } final Map<?, ?> m = (Map<?, ?>)json.get( LocatedBlocks.class.getSimpleName()); final long fileLength = ((Number) m.get("fileLength")).longValue(); final boolean isUnderConstruction = (Boolean)m.get("isUnderConstruction"); final List<LocatedBlock> locatedBlocks = toLocatedBlockList( getList(m, "locatedBlocks")); final LocatedBlock lastLocatedBlock = toLocatedBlock( (Map<?, ?>) m.get("lastLocatedBlock")); final boolean isLastBlockComplete = (Boolean)m.get("isLastBlockComplete"); return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks, lastLocatedBlock, isLastBlockComplete, null, null); }
DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, ErasureCodingPolicy ecPolicy, LocatedBlocks locatedBlocks) throws IOException { super(dfsClient, src, verifyChecksum, locatedBlocks); assert ecPolicy != null; this.ecPolicy = ecPolicy; this.cellSize = ecPolicy.getCellSize(); dataBlkNum = (short) ecPolicy.getNumDataUnits(); parityBlkNum = (short) ecPolicy.getNumParityUnits(); groupSize = dataBlkNum + parityBlkNum; blockReaders = new BlockReaderInfo[groupSize]; curStripeRange = new StripeRange(0, 0); readingService = new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool()); decoder = CodecUtil.createRSRawDecoder(dfsClient.getConfiguration(), dataBlkNum, parityBlkNum); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Creating an striped input stream for file " + src); } }
private void collectFileSummary(String path, HdfsFileStatus file, Result res, LocatedBlocks blocks) throws IOException { long fileLen = file.getLen(); boolean isOpen = blocks.isUnderConstruction(); if (isOpen && !showOpenFiles) { // We collect these stats about open files to report with default options res.totalOpenFilesSize += fileLen; res.totalOpenFilesBlocks += blocks.locatedBlockCount(); res.totalOpenFiles++; return; } res.totalFiles++; res.totalSize += fileLen; res.totalBlocks += blocks.locatedBlockCount(); if (showOpenFiles && isOpen) { out.print(path + " " + fileLen + " bytes, " + blocks.locatedBlockCount() + " block(s), OPENFORWRITE: "); } else if (showFiles) { out.print(path + " " + fileLen + " bytes, " + blocks.locatedBlockCount() + " block(s): "); } else if (showprogress) { out.print('.'); } }
@Override public GetBlockLocationsResponseProto getBlockLocations( RpcController controller, GetBlockLocationsRequestProto req) throws ServiceException { try { LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(), req.getLength()); Builder builder = GetBlockLocationsResponseProto .newBuilder(); if (b != null) { builder.setLocations(PBHelperClient.convert(b)).build(); } return builder.build(); } catch (IOException e) { throw new ServiceException(e); } }
private void checkFile(DistributedFileSystem fileSys, Path name) throws IOException { BlockLocation[] locations = fileSys.getFileBlockLocations( fileSys.getFileStatus(name), 0, fileSize); assertEquals("Number of blocks", fileSize, locations.length); FSDataInputStream stm = fileSys.open(name); byte[] expected = new byte[fileSize]; if (simulatedStorage) { LocatedBlocks lbs = fileSys.getClient().getLocatedBlocks(name.toString(), 0, fileSize); DFSTestUtil.fillExpectedBuf(lbs, expected); } else { Random rand = new Random(seed); rand.nextBytes(expected); } // do a sanity check. Read the file byte[] actual = new byte[fileSize]; stm.readFully(0, actual); checkAndEraseData(actual, 0, expected, "Read Sanity Test"); stm.close(); }
/** * Make sure the IDs of striped blocks do not conflict */ @Test public void testAllocateBlockId() throws Exception { Path testPath = new Path("/testfile"); // create a file while allocates a new block DFSTestUtil.writeFile(dfs, testPath, "hello, world!"); LocatedBlocks lb = dfs.getClient().getLocatedBlocks(testPath.toString(), 0); final long firstId = lb.get(0).getBlock().getBlockId(); // delete the file dfs.delete(testPath, true); // allocate a new block, and make sure the new block's id does not conflict // with the previous one DFSTestUtil.writeFile(dfs, testPath, "hello again"); lb = dfs.getClient().getLocatedBlocks(testPath.toString(), 0); final long secondId = lb.get(0).getBlock().getBlockId(); Assert.assertEquals(firstId + HdfsServerConstants.MAX_BLOCKS_IN_GROUP, secondId); }
@Test public void testLazyPersistBlocksAreSaved() throws IOException, InterruptedException, TimeoutException { getClusterBuilder().build(); final int NUM_BLOCKS = 10; final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); // Create a test file makeTestFile(path, BLOCK_SIZE * NUM_BLOCKS, true); LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK); waitForMetric("RamDiskBlocksLazyPersisted", NUM_BLOCKS); LOG.info("Verifying copy was saved to lazyPersist/"); // Make sure that there is a saved copy of the replica on persistent // storage. ensureLazyPersistBlocksAreSaved(locatedBlocks); }
/** * Delete lazy-persist file that has been persisted to disk * Both memory blocks and disk blocks are deleted. * @throws IOException * @throws InterruptedException */ @Test public void testDeleteAfterPersist() throws Exception { getClusterBuilder().build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); makeTestFile(path, BLOCK_SIZE, true); LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK); waitForMetric("RamDiskBlocksLazyPersisted", 1); // Delete after persist client.delete(path.toString(), false); Assert.assertFalse(fs.exists(path)); assertThat(verifyDeletedBlocks(locatedBlocks), is(true)); verifyRamDiskJMXMetric("RamDiskBlocksLazyPersisted", 1); verifyRamDiskJMXMetric("RamDiskBytesLazyPersisted", BLOCK_SIZE); }
/** * Verify that blocks in striped block group are on different nodes, and every * internal blocks exists. */ public static void verifyLocatedStripedBlocks(LocatedBlocks lbs, int groupSize) { for (LocatedBlock lb : lbs.getLocatedBlocks()) { assert lb instanceof LocatedStripedBlock; HashSet<DatanodeInfo> locs = new HashSet<>(); Collections.addAll(locs, lb.getLocations()); assertEquals(groupSize, lb.getLocations().length); assertEquals(groupSize, locs.size()); // verify that every internal blocks exists byte[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices(); assertEquals(groupSize, blockIndices.length); HashSet<Integer> found = new HashSet<>(); for (int index : blockIndices) { assert index >=0; found.add(index); } assertEquals(groupSize, found.size()); } }
private LocatedBlocks makeBadBlockList(LocatedBlocks goodBlockList) { LocatedBlock goodLocatedBlock = goodBlockList.get(0); LocatedBlock badLocatedBlock = new LocatedBlock( goodLocatedBlock.getBlock(), new DatanodeInfo[] { DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234) }); badLocatedBlock.setStartOffset(goodLocatedBlock.getStartOffset()); List<LocatedBlock> badBlocks = new ArrayList<LocatedBlock>(); badBlocks.add(badLocatedBlock); return new LocatedBlocks(goodBlockList.getFileLength(), false, badBlocks, null, true, null, null); }
/** * Convert a LocatedBlocks to BlockLocations[] * @param blocks a LocatedBlocks * @return an array of BlockLocations */ public static BlockLocation[] locatedBlocks2Locations(LocatedBlocks blocks) { if (blocks == null) { return new BlockLocation[0]; } return locatedBlocks2Locations(blocks.getLocatedBlocks()); }
/** Fetch a block from namenode and cache it */ private void fetchBlockAt(long offset) throws IOException { synchronized(infoLock) { int targetBlockIdx = locatedBlocks.findBlock(offset); if (targetBlockIdx < 0) { // block is not cached targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); } // fetch blocks final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset); if (newBlocks == null) { throw new IOException("Could not find target position " + offset); } locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); } }
@VisibleForTesting public LocatedBlocks getLocatedBlocks(String src, long start, long length) throws IOException { TraceScope scope = getPathTraceScope("getBlockLocations", src); try { return callGetBlockLocations(namenode, src, start, length); } finally { scope.close(); } }
/** * @see ClientProtocol#getBlockLocations(String, long, long) */ static LocatedBlocks callGetBlockLocations(ClientProtocol namenode, String src, long start, long length) throws IOException { try { return namenode.getBlockLocations(src, start, length); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class); } }
@Override // ClientProtocol public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException { checkNNStartup(); metrics.incrGetBlockLocations(); return namesystem.getBlockLocations(getClientMachine(), src, offset, length); }
/** Create a redirection URL */ private URL createRedirectURL(String path, String encodedPath, HdfsFileStatus status, UserGroupInformation ugi, ClientProtocol nnproxy, HttpServletRequest request, String dt) throws IOException { String scheme = request.getScheme(); final LocatedBlocks blks = nnproxy.getBlockLocations( status.getFullPath(new Path(path)).toUri().getPath(), 0, 1); final Configuration conf = NameNodeHttpServer.getConfFromContext( getServletContext()); final DatanodeID host = pickSrcDatanode(blks, status, conf); final String hostname; if (host instanceof DatanodeInfo) { hostname = host.getHostName(); } else { hostname = host.getIpAddr(); } int port = "https".equals(scheme) ? host.getInfoSecurePort() : host .getInfoPort(); String dtParam = ""; if (dt != null) { dtParam = JspHelper.getDelegationTokenUrlParam(dt); } // Add namenode address to the url params NameNode nn = NameNodeHttpServer.getNameNodeFromContext( getServletContext()); String addr = nn.getNameNodeAddressHostPortString(); String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr); return new URL(scheme, hostname, port, "/streamFile" + encodedPath + '?' + "ugi=" + ServletUtil.encodeQueryValue(ugi.getShortUserName()) + dtParam + addrParam); }
/** Select a datanode to service this request. * Currently, this looks at no more than the first five blocks of a file, * selecting a datanode randomly from the most represented. * @param conf */ private DatanodeID pickSrcDatanode(LocatedBlocks blks, HdfsFileStatus i, Configuration conf) throws IOException { if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) { // pick a random datanode NameNode nn = NameNodeHttpServer.getNameNodeFromContext( getServletContext()); return NamenodeJspHelper.getRandomDatanode(nn); } return JspHelper.bestNode(blks, conf); }