static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException { List<File> files = new ArrayList<File>(); List<DataNode> datanodes = cluster.getDataNodes(); int nsId = cluster.getNameNode().getNamespaceID(); Block[][] blocks = cluster.getAllBlockReports(nsId); for(int i = 0; i < blocks.length; i++) { FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset(); for(Block b : blocks[i]) { files.add(ds.getBlockFile(nsId,b)); } } return files; }
static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException { List<File> files = new ArrayList<File>(); List<DataNode> datanodes = cluster.getDataNodes(); Block[][] blocks = cluster.getAllBlockReports(); for(int i = 0; i < blocks.length; i++) { FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset(); for(Block b : blocks[i]) { files.add(ds.getBlockFile(b)); } } return files; }
static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException { List<File> files = new ArrayList<File>(); List<DataNode> datanodes = cluster.getDataNodes(); Iterable<Block>[] blocks = cluster.getAllBlockReports(); for(int i = 0; i < blocks.length; i++) { FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset(); for(Block b : blocks[i]) { files.add(ds.getBlockFile(b)); } } return files; }
public void testParallelCheckDirs() throws Exception { final DataNode datanode = cluster.getDataNodes().get(0); FSDataset fsDataset = (FSDataset) datanode.data; datanode.data = spy(fsDataset); final Method checkDiskMethod = DataNode.class.getDeclaredMethod( "checkDiskError", Exception.class); checkDiskMethod.setAccessible(true); doAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } }).when(datanode.data).checkDataDir(); Thread[] threads = new Thread[30]; for (int i = 0; i < 30; i++) { threads[i] = new Thread() { public void run() { try { checkDiskMethod.invoke(datanode, new Exception("Fake Exception")); } catch (IllegalArgumentException e) { TestCase.fail("IllegalArgumentException"); } catch (IllegalAccessException e) { TestCase.fail("IllegalAccessException"); } catch (InvocationTargetException e) { TestCase.fail("InvocationTargetException"); } } }; } // Parallel 10 checks should only have one launched. for (int i = 0; i < 10; i++) { threads[i].start(); } for (int i = 0; i < 10; i++) { threads[i].join(); } verify(datanode.data, times(1)).checkDataDir(); // Next checks won't be launched as one recently finishes. for (int i = 10; i < 20; i++) { threads[i].start(); } for (int i = 10; i < 20; i++) { threads[i].join(); } verify(datanode.data, times(1)).checkDataDir(); // After 2 seconds, another check should be able to run Thread.sleep(2000); for (int i = 20; i < 30; i++) { threads[i].start(); } for (int i = 20; i < 30; i++) { threads[i].join(); } verify(datanode.data, times(2)).checkDataDir(); }
/** * The only way this object can be instantiated. */ static BlockReaderLocal newBlockReader(Configuration conf, String file, Block blk, Token<BlockTokenIdentifier> token, DatanodeInfo node, int socketTimeout, long startOffset, long length) throws IOException { LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort()); // check the cache first BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk); if (pathinfo == null) { pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token); } // check to see if the file exists. It may so happen that the // HDFS file has been deleted and this block-lookup is occurring // on behalf of a new HDFS file. This time, the block file could // be residing in a different portion of the fs.data.dir directory. // In this case, we remove this entry from the cache. The next // call to this method will re-populate the cache. FileInputStream dataIn = null; FileInputStream checksumIn = null; BlockReaderLocal localBlockReader = null; boolean skipChecksum = shortCircuitChecksum(conf); try { // get a local file system File blkfile = new File(pathinfo.getBlockPath()); dataIn = new FileInputStream(blkfile); if (LOG.isDebugEnabled()) { LOG.debug("New BlockReaderLocal for file " + blkfile + " of size " + blkfile.length() + " startOffset " + startOffset + " length " + length + " short circuit checksum " + skipChecksum); } if (!skipChecksum) { // get the metadata file File metafile = new File(pathinfo.getMetaPath()); checksumIn = new FileInputStream(metafile); // read and handle the common header here. For now just a version BlockMetadataHeader header = BlockMetadataHeader .readHeader(new DataInputStream(checksumIn)); short version = header.getVersion(); if (version != FSDataset.METADATA_VERSION) { LOG.warn("Wrong version (" + version + ") for metadata file for " + blk + " ignoring ..."); } DataChecksum checksum = header.getChecksum(); localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, checksum, true, dataIn, checksumIn); } else { localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, dataIn); } } catch (IOException e) { // remove from cache localDatanodeInfo.removeBlockLocalPathInfo(blk); DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk + " from cache because local file " + pathinfo.getBlockPath() + " could not be opened."); throw e; } finally { if (localBlockReader == null) { if (dataIn != null) { dataIn.close(); } if (checksumIn != null) { checksumIn.close(); } } } return localBlockReader; }
/** * The only way this object can be instantiated. */ public static BlockReaderLocal newBlockReader(Configuration conf, String file, int namespaceid, Block blk, DatanodeInfo node, long startOffset, long length, DFSClientMetrics metrics, boolean verifyChecksum, boolean clearOsBuffer) throws IOException { // check in cache first BlockPathInfo pathinfo = cache.get(blk); if (pathinfo == null) { // cache the connection to the local data for eternity. if (datanode == null) { datanode = DFSClient.createClientDNProtocolProxy(node, conf, 0); } // make RPC to local datanode to find local pathnames of blocks if (datanode.isMethodSupported("getBlockPathInfo", int.class, Block.class)) { pathinfo = datanode.getProxy().getBlockPathInfo(namespaceid, blk); } else { pathinfo = datanode.getProxy().getBlockPathInfo(blk); } if (pathinfo != null) { cache.put(blk, pathinfo); } } // check to see if the file exists. It may so happen that the // HDFS file has been deleted and this block-lookup is occuring // on behalf of a new HDFS file. This time, the block file could // be residing in a different portion of the fs.data.dir directory. // In this case, we remove this entry from the cache. The next // call to this method will repopulate the cache. try { // get a local file system File blkfile = new File(pathinfo.getBlockPath()); FileInputStream dataIn = new FileInputStream(blkfile); if (LOG.isDebugEnabled()) { LOG.debug("New BlockReaderLocal for file " + blkfile + " of size " + blkfile.length() + " startOffset " + startOffset + " length " + length); } if (verifyChecksum) { // get the metadata file File metafile = new File(pathinfo.getMetaPath()); FileInputStream checksumIn = new FileInputStream(metafile); // read and handle the common header here. For now just a version BlockMetadataHeader header = BlockMetadataHeader.readHeader(new DataInputStream(checksumIn), new PureJavaCrc32()); short version = header.getVersion(); if (version != FSDataset.METADATA_VERSION) { LOG.warn("Wrong version (" + version + ") for metadata file for " + blk + " ignoring ..."); } DataChecksum checksum = header.getChecksum(); return new BlockReaderLocal(conf, file, blk, startOffset, length, pathinfo, metrics, checksum, verifyChecksum, dataIn, checksumIn, clearOsBuffer); } else { return new BlockReaderLocal(conf, file, blk, startOffset, length, pathinfo, metrics, dataIn, clearOsBuffer); } } catch (FileNotFoundException e) { cache.remove(blk); // remove from cache DFSClient.LOG.warn("BlockReaderLoca: Removing " + blk + " from cache because local file " + pathinfo.getBlockPath() + " could not be opened."); throw e; } }