ReaderPointer(Configuration configuration, Path path) throws IOException { _path = new Path(path.getParent(), path.getName() + ".pointer"); _fileSystem = newFileSystem(_path, configuration); if (_fileSystem.exists(_path)) { FSDataInputStream inputStream = _fileSystem.open(_path); DFSInputStream dfs = getDFS(inputStream); long fileLength = dfs.getFileLength(); long offset = fileLength % 8; long filePostion = fileLength - offset - 8; if (filePostion >= 0) { inputStream.seek(filePostion); _position = inputStream.readLong(); } inputStream.close(); } _outputStream = _fileSystem.create(_path, true); _outputStream.writeLong(_position); _outputStream.flush(); _outputStream.sync(); }
private long getTotalAckLength(FileSystem fileSystem) throws IOException { long size = 0; for (Path ackFile : _ackFiles) { DFSInputStream inputStream = getDFS(fileSystem.open(ackFile)); size += inputStream.getFileLength(); inputStream.close(); } return size; }
/** * This tests that DFSInputStream failures are counted for a given read * operation, and not over the lifetime of the stream. It is a regression * test for HDFS-127. */ public void testFailuresArePerOperation() throws Exception { long fileSize = 4096; Path file = new Path("/testFile"); MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); int maxBlockAcquires = DFSClient.getMaxBlockAcquireFailures(conf); assertTrue(maxBlockAcquires > 0); try { cluster.waitActive(); FileSystem fs = cluster.getFileSystem(); NameNode preSpyNN = cluster.getNameNode(); NameNode spyNN = spy(preSpyNN); DFSClient client = new DFSClient(null, spyNN, conf, null); DFSTestUtil.createFile(fs, file, fileSize, (short)1, 12345L /*seed*/); // If the client will retry maxBlockAcquires times, then if we fail // any more than that number of times, the operation should entirely // fail. doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires + 1)) .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong()); try { IOUtils.copyBytes(client.open(file.toString()), new IOUtils.NullOutputStream(), conf, true); fail("Didn't get exception"); } catch (IOException ioe) { DFSClient.LOG.info("Got expected exception", ioe); } // If we fail exactly that many times, then it should succeed. doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires)) .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong()); IOUtils.copyBytes(client.open(file.toString()), new IOUtils.NullOutputStream(), conf, true); DFSClient.LOG.info("Starting test case for failure reset"); // Now the tricky case - if we fail a few times on one read, then succeed, // then fail some more on another read, it shouldn't fail. doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires)) .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong()); DFSInputStream is = client.open(file.toString()); byte buf[] = new byte[10]; IOUtils.readFully(is, buf, 0, buf.length); DFSClient.LOG.info("First read successful after some failures."); // Further reads at this point will succeed since it has the good block locations. // So, force the block locations on this stream to be refreshed from bad info. // When reading again, it should start from a fresh failure count, since // we're starting a new operation on the user level. doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires)) .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong()); is.openInfo(); // Seek to beginning forces a reopen of the BlockReader - otherwise it'll // just keep reading on the existing stream and the fact that we've poisoned // the block info won't do anything. is.seek(0); IOUtils.readFully(is, buf, 0, buf.length); } finally { cluster.shutdown(); } }
/** * This tests that DFSInputStream failures are counted for a given read * operation, and not over the lifetime of the stream. It is a regression * test for HDFS-127. */ public void testFailuresArePerOperation() throws Exception { long fileSize = 4096; Path file = new Path("/testFile"); Configuration conf = new Configuration(); MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); int maxBlockAcquires = DFSClient.getMaxBlockAcquireFailures(conf); assertTrue(maxBlockAcquires > 0); try { cluster.waitActive(); FileSystem fs = cluster.getFileSystem(); NameNode preSpyNN = cluster.getNameNode(); NameNode spyNN = spy(preSpyNN); DFSClient client = new DFSClient(null, spyNN, conf, null); DFSTestUtil.createFile(fs, file, fileSize, (short)1, 12345L /*seed*/); // If the client will retry maxBlockAcquires times, then if we fail // any more than that number of times, the operation should entirely // fail. doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires + 1)) .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong()); try { IOUtils.copyBytes(client.open(file.toString()), new IOUtils.NullOutputStream(), conf, true); fail("Didn't get exception"); } catch (IOException ioe) { DFSClient.LOG.info("Got expected exception", ioe); } // If we fail exactly that many times, then it should succeed. doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires)) .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong()); IOUtils.copyBytes(client.open(file.toString()), new IOUtils.NullOutputStream(), conf, true); DFSClient.LOG.info("Starting test case for failure reset"); // Now the tricky case - if we fail a few times on one read, then succeed, // then fail some more on another read, it shouldn't fail. doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires)) .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong()); DFSInputStream is = client.open(file.toString()); byte buf[] = new byte[10]; IOUtils.readFully(is, buf, 0, buf.length); DFSClient.LOG.info("First read successful after some failures."); // Further reads at this point will succeed since it has the good block locations. // So, force the block locations on this stream to be refreshed from bad info. // When reading again, it should start from a fresh failure count, since // we're starting a new operation on the user level. doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires)) .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong()); is.openInfo(); // Seek to beginning forces a reopen of the BlockReader - otherwise it'll // just keep reading on the existing stream and the fact that we've poisoned // the block info won't do anything. is.seek(0); IOUtils.readFully(is, buf, 0, buf.length); } finally { cluster.shutdown(); } }
private boolean ackCheck() throws IOException { LOG.info("Starting ack check"); BitSet bitSet = new BitSet(); FileSystem fileSystem = null; try { _ackLock.lock(); _ackOutputStream.close(); fileSystem = newFileSystem(_file); FileStatus fileStatus = fileSystem.getFileStatus(_file); long dataLength = fileStatus.getLen(); long totalAckLength = getTotalAckLength(fileSystem); if (!couldContainAllAcks(totalAckLength)) { LOG.info("Existing early [" + totalAckLength + "] because [" + totalAckLength % 12 + "]"); return false; } for (Path ackFile : _ackFiles) { LOG.info("Starting ack check for file [" + ackFile + "]"); DFSInputStream inputStream = null; try { inputStream = getDFS(fileSystem.open(ackFile)); long length = inputStream.getFileLength(); DataInputStream dataInputStream = new DataInputStream(inputStream); while (length > 0) { int pos = (int) dataInputStream.readLong(); // @TODO check position // 4 bytes for storing the length of the message int len = dataInputStream.readInt() + 4; bitSet.set(pos, pos + len); length -= 12; } if (bitSet.cardinality() == dataLength) { return true; } } finally { if (inputStream != null) { inputStream.close(); } } } return false; } finally { reopenAckFile(fileSystem); _ackLock.unlock(); if (fileSystem != null) { fileSystem.close(); } } }