public void testRead() throws Exception{ for(int i = 0; i < TEST_FILE_NUM; ++i) { String file = "/tmp" + i +".txt"; DFSTestUtil.createFile(fs, new Path(file), FILE_LEN, (short)5, 1L); DFSDataInputStream in = (DFSDataInputStream)fs.open(new Path(file)); int numOfRead = 0; while(in.read() > 0){ numOfRead ++; } assertEquals(FILE_LEN * (i+1), metrics.readSize.getCurrentIntervalValue()); assertEquals(numOfRead * (i+1), metrics.readOps.getCurrentIntervalValue()); } }
public void runPreadTest(Configuration conf) throws Exception { DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem(conf); String fileName = "/test"; Path p = new Path(fileName); for (int pri = 0; pri < 8; pri++) { createFile(p, pri); ioprioClass = ioprioData = 0; DFSDataInputStream in = (DFSDataInputStream) fs.open(p); byte[] buffer = new byte[BLOCK_SIZE * 2]; ReadOptions options = new ReadOptions(); options.setIoprio(NativeIO.IOPRIO_CLASS_BE, pri); in.read(BLOCK_SIZE / 2, buffer, 0, BLOCK_SIZE / 2, options); if (NativeIO.isAvailable()) { assertTrue(NativeIO.isIoprioPossible()); assertEquals(NativeIO.IOPRIO_CLASS_BE, ioprioClass); assertEquals(pri, ioprioData); } } }
public void collectSrcBlocksChecksum(ChecksumStore ckmStore) throws IOException { if (ckmStore == null) { return; } LOG.info("Store the checksums of source blocks into checksumStore"); for (int i = 0; i < streams.length; i++) { if (streams[i] != null && streams[i] instanceof DFSDataInputStream && !(streams[i] instanceof RaidUtils.ZeroInputStream)) { DFSDataInputStream stream = (DFSDataInputStream)this.streams[i]; Long newVal = checksums[i].getValue(); ckmStore.putIfAbsentChecksum(stream.getCurrentBlock(), newVal); } } }
static void checkFile(Path p, int expectedsize, final Configuration conf ) throws IOException, InterruptedException { //open the file with another user account final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "_" + ++userCount; UserGroupInformation ugi = UserGroupInformation.createUserForTesting(username, new String[] {"supergroup"}); final FileSystem fs = DFSTestUtil.getFileSystemAs(ugi, conf); final DFSDataInputStream in = (DFSDataInputStream)fs.open(p); //Check visible length Assert.assertTrue(in.getVisibleLength() >= expectedsize); //Able to read? for(int i = 0; i < expectedsize; i++) { Assert.assertEquals((byte)i, (byte)in.read()); } in.close(); }
public static int readTillEnd(InputStream in, byte[] buf, boolean eofOK, long endOffset, int toRead) throws IOException { int numRead = 0; while (numRead < toRead) { int readLen = toRead - numRead; if (in instanceof DFSDataInputStream) { int available = (int)(endOffset - ((DFSDataInputStream)in).getPos()); if (available < readLen) { readLen = available; } } int nread = readLen > 0? in.read(buf, numRead, readLen): 0; if (nread < 0) { if (eofOK) { // EOF hit, fill with zeros Arrays.fill(buf, numRead, toRead, (byte)0); break; } else { // EOF hit, throw. throw new IOException("Premature EOF"); } } else if (nread == 0) { // reach endOffset, fill with zero; Arrays.fill(buf, numRead, toRead, (byte)0); break; } else { numRead += nread; } } // return 0 if we read a ZeroInputStream if (in instanceof ZeroInputStream) { return 0; } return numRead; }
/** * Reads data from multiple streams in parallel and puts the data in a queue. * * @param streams * The input streams to read from. * @param bufSize * The amount of data to read from each stream in each go. * @param numThreads * Number of threads to use for parallelism. * @param boundedBuffer * The queue to place the results in. */ public ParallelStreamReader(Progressable reporter, InputStream[] streams, int bufSize, int numThreads, int boundedBufferCapacity, long maxBytesPerStream) throws IOException { this.reporter = reporter; this.streams = new InputStream[streams.length]; this.endOffsets = new long[streams.length]; for (int i = 0; i < streams.length; i++) { this.streams[i] = streams[i]; if (this.streams[i] instanceof DFSDataInputStream) { DFSDataInputStream stream = (DFSDataInputStream) this.streams[i]; // in directory raiding, the block size for each input stream // might be different, so we need to determine the endOffset of // each stream by their own block size. List<LocatedBlock> blocks = stream.getAllBlocks(); if (blocks.size() == 0) { this.endOffsets[i] = Long.MAX_VALUE; } else { this.endOffsets[i] = stream.getPos() + blocks.get(0).getBlockSize(); } } else { this.endOffsets[i] = Long.MAX_VALUE; } streams[i] = null; // Take over ownership of streams. } this.bufSize = bufSize; this.boundedBuffer = new ArrayBlockingQueue<ReadResult>(boundedBufferCapacity); if (numThreads > streams.length) { this.numThreads = streams.length; } else { this.numThreads = numThreads; } this.remainingBytesPerStream = maxBytesPerStream; this.slots = new Semaphore(this.numThreads); this.readPool = Executors.newFixedThreadPool(this.numThreads); this.mainThread = new MainThread(); }
public static int readTillEnd(InputStream in, byte[] buf, boolean eofOK, long endOffset, int toRead) throws IOException { int numRead = 0; while (numRead < toRead) { int readLen = toRead - numRead; if (in instanceof DFSDataInputStream) { int available = (int) (endOffset - ((DFSDataInputStream) in).getPos()); if (available < readLen) { readLen = available; } } int nread = readLen > 0 ? in.read(buf, numRead, readLen) : 0; if (nread < 0) { if (eofOK) { // EOF hit, fill with zeros Arrays.fill(buf, numRead, toRead, (byte) 0); break; } else { // EOF hit, throw. throw new IOException("Premature EOF"); } } else if (nread == 0) { // reach endOffset, fill with zero; Arrays.fill(buf, numRead, toRead, (byte) 0); break; } else { numRead += nread; } } // return 0 if we read a ZeroInputStream if (in instanceof ZeroInputStream) { return 0; } return numRead; }
/** * Verifies that reading a file with the direct read(ByteBuffer) api gives the * expected set of bytes. */ static void checkFileContentDirect(FileSystem fs, Path name, byte[] expected, int readOffset) throws IOException { DFSDataInputStream stm = (DFSDataInputStream) fs.open(name); ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset); IOUtils.skipFully(stm, readOffset); actual.limit(3); //Read a small number of bytes first. int nread = stm.read(actual); actual.limit(nread + 2); nread += stm.read(actual); // Read across chunk boundary actual.limit(Math.min(actual.capacity(), nread + 517)); nread += stm.read(actual); checkData(arrayFromByteBuffer(actual), readOffset, expected, nread, "A few bytes"); //Now read rest of it actual.limit(actual.capacity()); while (actual.hasRemaining()) { int nbytes = stm.read(actual); if (nbytes < 0) { throw new EOFException("End of file reached before reading fully."); } nread += nbytes; } checkData(arrayFromByteBuffer(actual), readOffset, expected, "Read 3"); stm.close(); }
@SuppressWarnings("deprecation") @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { statistics.incrementReadOps(1); return new DFSClient.DFSDataInputStream( dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics)); }
public static Block getFirstBlock(FileSystem fs, Path path) throws IOException { DFSDataInputStream in = (DFSDataInputStream) ((DistributedFileSystem)fs).open(path); in.readByte(); return in.getCurrentBlock(); }
public void testUnfavoredNodes() throws IOException { Configuration conf = new Configuration(); conf.setBoolean("dfs.client.block.location.renewal.enabled", false); MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null); try { FileSystem fs = cluster.getFileSystem(); DistributedFileSystem dfs = DFSUtil.convertToDFS(fs); TestCase.assertNotNull(dfs); Path path = new Path("/testUnfavoredNodes"); FSDataOutputStream stm = fs .create(path, true, 4096, (short) 2, (long) 2048); stm.write(new byte[4096]); stm.close(); FSDataInputStream is = fs.open(path); DFSDataInputStream dis = (DFSDataInputStream) is; TestCase.assertNotNull(dis); is.read(new byte[1024]); DatanodeInfo currentDn1 = dis.getCurrentDatanode(); dis.setUnfavoredNodes(Arrays.asList(new DatanodeInfo[] { currentDn1 })); is.read(new byte[512]); DatanodeInfo currentDn2 = dis.getCurrentDatanode(); TestCase.assertTrue(!currentDn2.equals(currentDn1)); dis.setUnfavoredNodes(Arrays.asList(new DatanodeInfo[] { currentDn1, currentDn2 })); is.read(new byte[512]); TestCase.assertEquals(currentDn1, dis.getCurrentDatanode()); is.read(new byte[1024]); TestCase.assertEquals(dis.getAllBlocks().get(1).getLocations()[0], dis.getCurrentDatanode()); } finally { if (cluster != null) {cluster.shutdown();} } }
/** * Reads data from multiple streams in parallel and puts the data in a queue. * @param streams The input streams to read from. * @param bufSize The amount of data to read from each stream in each go. * @param numThreads Number of threads to use for parallelism. * @param boundedBuffer The queue to place the results in. */ public ParallelStreamReader( Progressable reporter, InputStream[] streams, int bufSize, int numThreads, int boundedBufferCapacity, long maxBytesPerStream, boolean computeChecksum, OutputStream[] outs) throws IOException { this.reporter = reporter; this.computeChecksum = computeChecksum; this.streams = new InputStream[streams.length]; this.endOffsets = new long[streams.length]; if (computeChecksum) { this.checksums = new CRC32[streams.length]; } this.outs = outs; for (int i = 0; i < streams.length; i++) { this.streams[i] = streams[i]; if (this.streams[i] instanceof DFSDataInputStream) { DFSDataInputStream stream = (DFSDataInputStream)this.streams[i]; // in directory raiding, the block size for each input stream // might be different, so we need to determine the endOffset of // each stream by their own block size. List<LocatedBlock> blocks = stream.getAllBlocks(); if (blocks.size() == 0) { this.endOffsets[i] = Long.MAX_VALUE; if (computeChecksum) { this.checksums[i] = null; } } else { long blockSize = blocks.get(0).getBlockSize(); this.endOffsets[i] = stream.getPos() + blockSize; if (computeChecksum) { this.checksums[i] = new CRC32(); } } } else { this.endOffsets[i] = Long.MAX_VALUE; if (computeChecksum) { this.checksums[i] = null; } } streams[i] = null; // Take over ownership of streams. } this.bufSize = bufSize; this.boundedBuffer = new ArrayBlockingQueue<ReadResult>(boundedBufferCapacity); if (numThreads > streams.length) { this.numThreads = streams.length; } else { this.numThreads = numThreads; } this.remainingBytesPerStream = maxBytesPerStream; this.slots = new Semaphore(this.numThreads); ThreadFactory ParallelStreamReaderFactory = new ThreadFactoryBuilder() .setNameFormat("ParallelStreamReader-read-pool-%d") .build(); this.readPool = Executors.newFixedThreadPool(this.numThreads, ParallelStreamReaderFactory); this.mainThread = new MainThread(); mainThread.setName("ParallelStreamReader-main"); }
public static List<LocatedBlock> getAllBlocks(FSDataInputStream in) throws IOException { return ((DFSClient.DFSDataInputStream) in).getAllBlocks(); }
/** * We need to find the blocks that didn't match. Likely only one * is corrupt but we will report both to the namenode. In the future, * we can consider figuring out exactly which block is corrupt. */ public boolean reportChecksumFailure(Path f, FSDataInputStream in, long inPos, FSDataInputStream sums, long sumsPos) { if(!(in instanceof DFSDataInputStream && sums instanceof DFSDataInputStream)) throw new IllegalArgumentException("Input streams must be types " + "of DFSDataInputStream"); LocatedBlock lblocks[] = new LocatedBlock[2]; // Find block in data stream. DFSClient.DFSDataInputStream dfsIn = (DFSClient.DFSDataInputStream) in; Block dataBlock = dfsIn.getCurrentBlock(); if (dataBlock == null) { LOG.error("Error: Current block in data stream is null! "); return false; } DatanodeInfo[] dataNode = {dfsIn.getCurrentDatanode()}; lblocks[0] = new LocatedBlock(dataBlock, dataNode); LOG.info("Found checksum error in data stream at block=" + dataBlock + " on datanode=" + dataNode[0].getName()); // Find block in checksum stream DFSClient.DFSDataInputStream dfsSums = (DFSClient.DFSDataInputStream) sums; Block sumsBlock = dfsSums.getCurrentBlock(); if (sumsBlock == null) { LOG.error("Error: Current block in checksum stream is null! "); return false; } DatanodeInfo[] sumsNode = {dfsSums.getCurrentDatanode()}; lblocks[1] = new LocatedBlock(sumsBlock, sumsNode); LOG.info("Found checksum error in checksum stream at block=" + sumsBlock + " on datanode=" + sumsNode[0].getName()); // Ask client to delete blocks. dfs.reportChecksumFailure(f.toString(), lblocks); return true; }
/** * Reads data from multiple streams in parallel and puts the data in a queue. * @param streams The input streams to read from. * @param bufSize The amount of data to read from each stream in each go. * @param numThreads Number of threads to use for parallelism. * @param boundedBuffer The queue to place the results in. */ public ParallelStreamReader( Progressable reporter, InputStream[] streams, int bufSize, int numThreads, int boundedBufferCapacity, long maxBytesPerStream) throws IOException { this.reporter = reporter; this.streams = new InputStream[streams.length]; this.endOffsets = new long[streams.length]; for (int i = 0; i < streams.length; i++) { this.streams[i] = streams[i]; if (this.streams[i] instanceof DFSDataInputStream) { DFSDataInputStream stream = (DFSDataInputStream)this.streams[i]; // in directory raiding, the block size for each input stream // might be different, so we need to determine the endOffset of // each stream by their own block size. List<LocatedBlock> blocks = stream.getAllBlocks(); if (blocks.size() == 0) { this.endOffsets[i] = Long.MAX_VALUE; } else { this.endOffsets[i] = stream.getPos() + blocks.get(0).getBlockSize(); } } else { this.endOffsets[i] = Long.MAX_VALUE; } streams[i] = null; // Take over ownership of streams. } this.bufSize = bufSize; this.boundedBuffer = new ArrayBlockingQueue<ReadResult>(boundedBufferCapacity); if (numThreads > streams.length) { this.numThreads = streams.length; } else { this.numThreads = numThreads; } this.remainingBytesPerStream = maxBytesPerStream; this.slots = new Semaphore(this.numThreads); this.readPool = Executors.newFixedThreadPool(this.numThreads); this.mainThread = new MainThread(); }