private static Configuration createShortCircuitConf(String testName, TemporarySocketDirectory sockDir) { Configuration conf = new Configuration(); conf.set(DFS_CLIENT_CONTEXT, testName); conf.setLong(DFS_BLOCK_SIZE_KEY, 4096); conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), testName).getAbsolutePath()); conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false); conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false); DFSInputStream.tcpReadsDisabledForTesting = true; DomainSocket.disableBindPathValidation(); Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); return conf; }
private static Configuration createShortCircuitConf(String testName, TemporarySocketDirectory sockDir) { Configuration conf = new Configuration(); conf.set(DFS_CLIENT_CONTEXT, testName); conf.setLong(DFS_BLOCK_SIZE_KEY, 4096); conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), testName).getAbsolutePath()); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false); conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false); DFSInputStream.tcpReadsDisabledForTesting = true; DomainSocket.disableBindPathValidation(); Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); return conf; }
private void tryGetLocalFile() { if (tryGetLocalFileTimes >= TRY_GET_LOCAL_FILE_LIMIT) { return; } if (isSingleBlock && HDFS_READ_HACK_ENABLE) { try { InputStream is = input.getWrappedStream(); if (is instanceof DFSInputStream) { BlockReader blockReader = MemoryUtil.getDFSInputStream_blockReader(is); if (blockReader != null && blockReader.isShortCircuit()) { localFile = MemoryUtil.getBlockReaderLocal_dataIn(blockReader); } } } catch (Throwable e) { logger.debug("HDFS READ HACK failed.", e); } } tryGetLocalFileTimes++; }
/** * lists, and then reads the first byte in all of the files in * this hostname/process' namespace * @return -1 on error, 0 on success */ private static int readDFSPaths() { if (listDFSPaths() != 0) { return -1; } try{ for (Map.Entry<String, OutputStream> file : files_.entrySet()) { long startTime = System.nanoTime(); DFSInputStream os = dfsClient_.open(file.getKey()); timingOpen_.add(new Double((System.nanoTime() - startTime)/(1E9))); os.read(); os.close(); } } catch (IOException e) { e.printStackTrace(); } return 0; }
private void addDirToMaps(Path dir, DFSClient client) throws IOException { FileStatus[] children = dfs.listStatus(dir); if (children == null) return; for (FileStatus child: children) { if (!child.isDir()) { // get block ids for file Path path = child.getPath(); // paths will be unique fileMap.put(path, new ArrayList<Long>()); DFSInputStream stm = client.open(child.getPath().toUri().getPath()); LocatedBlocks blocks = stm.fetchLocatedBlocks(); stm.close(); for (int i = 0; i < blocks.locatedBlockCount(); i++) { Long blockId = blocks.get(i).getBlock().getBlockId(); fileMap.get(path).add(blockId); // add to file block list blockRefMap.put(blockId, null); // mark as unrefereced } } else { // If child is a directory, recurse on it addDirToMaps(child.getPath(), client); } } }
private CacheLoader<DFSInputStreamCaheKey, FSDataInputStream> inputStreamLoader() { return new CacheLoader<DFSInputStreamCaheKey, FSDataInputStream>() { @Override public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception { DFSClient client = getDfsClient(key.userId); DFSInputStream dis = client.open(key.inodePath); return client.createWrappedInputStream(dis); } }; }
@SuppressWarnings("deprecation") @Override public HdfsDataInputStream open(Path f, int bufferSize) throws IOException, UnresolvedLinkException { final DFSInputStream dfsis = dfs.open(getUriPath(f), bufferSize, verifyChecksum); return dfs.createWrappedInputStream(dfsis); }
private byte[] cacheInitialContents() throws IOException { HdfsFileStatus status = dfsClient.getFileInfo(name); byte[] content = new byte[(int)status.getLen()]; DFSInputStream in = null; try { in = dfsClient.open(name); IOUtils.readFully(in, content, 0, content.length); } finally { in.close(); } return content; }
public void checkSalvagedRemains() throws IOException { int chainIdx = 0; HdfsFileStatus status = dfsClient.getFileInfo(name); long length = status.getLen(); int numBlocks = (int)((length + blockSize - 1) / blockSize); DFSInputStream in = null; byte[] blockBuffer = new byte[blockSize]; try { for (int blockIdx = 0; blockIdx < numBlocks; blockIdx++) { if (blocksToCorrupt.contains(blockIdx)) { if (in != null) { in.close(); in = null; } continue; } if (in == null) { in = dfsClient.open("/lost+found" + name + "/" + chainIdx); chainIdx++; } int len = blockBuffer.length; if (blockIdx == (numBlocks - 1)) { // The last block might not be full-length len = (int)(in.getFileLength() % blockSize); if (len == 0) len = blockBuffer.length; } IOUtils.readFully(in, blockBuffer, 0, len); int startIdx = blockIdx * blockSize; for (int i = 0; i < len; i++) { if (initialContents[startIdx + i] != blockBuffer[i]) { throw new IOException("salvaged file " + name + " differed " + "from what we expected on block " + blockIdx); } } } } finally { IOUtils.cleanup(null, in); } }
@Test public void testDoGetShouldCloseTheDFSInputStreamIfResponseGetOutPutStreamThrowsAnyException() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(1) .build(); try { Path testFile = createFile(); setUpForDoGetTest(cluster, testFile); Mockito.doThrow(new IOException()).when(mockHttpServletResponse) .getOutputStream(); DFSInputStream fsMock = Mockito.mock(DFSInputStream.class); Mockito.doReturn(fsMock).when(clientMock).open(testFile.toString()); Mockito.doReturn(Long.valueOf(4)).when(fsMock).getFileLength(); try { sfile.doGet(mockHttpServletRequest, mockHttpServletResponse); fail("Not throwing the IOException"); } catch (IOException e) { Mockito.verify(clientMock, Mockito.atLeastOnce()).close(); } } finally { cluster.shutdown(); } }
public static ByteBufferReader open(org.apache.hadoop.fs.FileSystem fileSystem, org.apache.hadoop.fs.Path path, long size, int blockCount, long readBase) throws IOException { FSDataInputStream stream = fileSystem.open(path); if (HDFS_READ_HACK_ENABLE) { if (IS_SHORT_CIRCUIT_LOCAL_READ_ENABLE == null) { IS_SHORT_CIRCUIT_LOCAL_READ_ENABLE = Boolean.parseBoolean(fileSystem.getConf().get("dfs.client.read.shortcircuit", "false")); } if (IS_SHORT_CIRCUIT_LOCAL_READ_ENABLE) { InputStream is = stream.getWrappedStream(); if (is instanceof DFSInputStream) { // Close check sum if short circuit local read is enabled. MemoryUtil.setDFSInputStream_verifyChecksum(is, false); logger.debug("disable read check sum for: {}", path); } } } return new DFSByteBufferReader( path.toString(), stream, size, readBase, stream, blockCount); }
public static DFSInputStream findDFSClientInputStream(FSDataInputStream in) throws SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException { Field inField = FilterInputStream.class.getDeclaredField("in"); inField.setAccessible(true); return (DFSInputStream) inField.get(in); }
@SuppressWarnings("unchecked") public static ConcurrentHashMap<DatanodeInfo, DatanodeInfo> getDeadNodes( DFSInputStream in) throws SecurityException, IllegalArgumentException, NoSuchFieldException, IllegalAccessException { Field deadNodesField = DFSInputStream.class.getDeclaredField("deadNodes"); deadNodesField.setAccessible(true); return (ConcurrentHashMap<DatanodeInfo, DatanodeInfo>) deadNodesField .get(in); }
public DFSInputStream open(String snapshotId, String src) throws IOException { LocatedBlocksWithMetaInfo blocks[] = getLocatedBlocks(snapshotId, src); // Not strictly correct. block.length = 1 could mean directory with // one file. Might want to add a file specific API. if (blocks == null || blocks.length != 1) { throw new IOException("File at " + src + " doesn't exist in snapshot"); } return client.open(blocks[0]); }
public void checkSalvagedRemains() throws IOException { int chainIdx = 0; HdfsFileStatus status = dfsClient.getFileInfo(name); long length = status.getLen(); int numBlocks = (int)((length + blockSize - 1) / blockSize); DFSInputStream in = null; byte[] blockBuffer = new byte[blockSize]; try { for (int blockIdx = 0; blockIdx < numBlocks; blockIdx++) { if (blocksToCorrupt.contains(blockIdx)) { if (in != null) { in.close(); in = null; } continue; } if (in == null) { in = dfsClient.open("/lost+found" + name + "/" + chainIdx); chainIdx++; } int len = blockBuffer.length; if (blockIdx == (numBlocks - 1)) { // The last block might not be full-length len = (int)(in.getFileLength() % blockSize); if (len == 0) len = blockBuffer.length; } IOUtils.readFully(in, blockBuffer, 0, (int)len); int startIdx = blockIdx * blockSize; for (int i = 0; i < len; i++) { if (initialContents[startIdx + i] != blockBuffer[i]) { throw new IOException("salvaged file " + name + " differed " + "from what we expected on block " + blockIdx); } } } } finally { IOUtils.cleanup(null, in); } }
private CacheLoader<DFSInputStreamCaheKey, FSDataInputStream> inputStreamLoader() { return new CacheLoader<DFSInputStreamCaheKey, FSDataInputStream>() { @Override public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception { DFSClient client = getDfsClient(key.userId); DFSInputStream dis = client.open(key.inodePath); return new FSDataInputStream(dis); } }; }