private boolean replaceBlock( Block block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination, int namespaceId) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr( destination.getName()), HdfsConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK); out.writeInt(namespaceId); out.writeLong(block.getBlockId()); out.writeLong(block.getGenerationStamp()); Text.writeString(out, source.getStorageID()); sourceProxy.write(out); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); short status = reply.readShort(); if(status == DataTransferProtocol.OP_STATUS_SUCCESS) { return true; } return false; }
public static FSEditLogLoader.EditLogValidation validateEditLog( LedgerHandleProvider ledgerProvider, EditLogLedgerMetadata ledgerMetadata) throws IOException { BookKeeperEditLogInputStream in; try { in = new BookKeeperEditLogInputStream(ledgerProvider, ledgerMetadata.getLedgerId(), 0, ledgerMetadata.getFirstTxId(), ledgerMetadata.getLastTxId(), ledgerMetadata.getLastTxId() == -1); } catch (LedgerHeaderCorruptException e) { LOG.warn("Log at ledger id" + ledgerMetadata.getLedgerId() + " has no valid header", e); return new FSEditLogLoader.EditLogValidation(0, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, true); } try { return FSEditLogLoader.validateEditLog(in); } finally { IOUtils.closeStream(in); } }
long validateAndGetEndTxId(EditLogLedgerMetadata ledger, boolean fence) throws IOException { FSEditLogLoader.EditLogValidation val; if (!fence) { val = BookKeeperEditLogInputStream.validateEditLog(this, ledger); } else { val = BookKeeperEditLogInputStream.validateEditLog( new FencingLedgerHandleProvider(), ledger); } InjectionHandler.processEvent(InjectionEvent.BKJM_VALIDATELOGSEGMENT, val); if (val.getNumTransactions() == 0) { return HdfsConstants.INVALID_TXID; // Ledger is corrupt } return val.getEndTxId(); }
static EditLogInputStream getJournalInputStreamDontCheckLastTxId( JournalManager jm, long txId) throws IOException { List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>(); jm.selectInputStreams(streams, txId, true, false); if (streams.size() < 1) { throw new IOException("Cannot obtain stream for txid: " + txId); } Collections.sort(streams, JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); if (txId == HdfsConstants.INVALID_TXID) { return streams.get(0); } for (EditLogInputStream elis : streams) { if (elis.getFirstTxId() == txId) { return elis; } } throw new IOException("Cannot obtain stream for txid: " + txId); }
/** * Create a compression instance based on the user's configuration in the given * Configuration object. * @throws IOException if the specified codec is not available. */ static FSImageCompression createCompression(Configuration conf, boolean forceUncompressed) throws IOException { boolean compressImage = (!forceUncompressed) && conf.getBoolean( HdfsConstants.DFS_IMAGE_COMPRESS_KEY, HdfsConstants.DFS_IMAGE_COMPRESS_DEFAULT); if (!compressImage) { return createNoopCompression(); } String codecClassName = conf.get( HdfsConstants.DFS_IMAGE_COMPRESSION_CODEC_KEY, HdfsConstants.DFS_IMAGE_COMPRESSION_CODEC_DEFAULT); return createCompression(conf, codecClassName); }
/** * Get input stream from the given journal starting at txid. * Does not perform validation of the streams. * * This should only be used for tailing inprogress streams!!! */ public static EditLogInputStream getInputStream(JournalManager jm, long txid) throws IOException { List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>(); jm.selectInputStreams(streams, txid, true, false); if (streams.size() < 1) { throw new IOException("Cannot obtain stream for txid: " + txid); } Collections.sort(streams, JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); // we want the "oldest" available stream if (txid == HdfsConstants.INVALID_TXID) { return streams.get(0); } // we want a specific stream for (EditLogInputStream elis : streams) { if (elis.getFirstTxId() == txid) { return elis; } } // we cannot obtain the stream throw new IOException("Cannot obtain stream for txid: " + txid); }
static FSEditLogLoader.EditLogValidation validateEditLog(File file) throws IOException { EditLogFileInputStream in; try { in = new EditLogFileInputStream(file); in.getVersion(); } catch (LogHeaderCorruptException corrupt) { // If it's missing its header, this is equivalent to no transactions FSImage.LOG.warn("Log at " + file + " has no valid header", corrupt); return new FSEditLogLoader.EditLogValidation(0, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, true); } try { return FSEditLogLoader.validateEditLog(in); } finally { IOUtils.closeStream(in); } }
EditLogFile(File file, long firstTxId, long lastTxId, boolean isInProgress) { boolean checkTxIds = true; checkTxIds &= ((lastTxId == HdfsConstants.INVALID_TXID && isInProgress) || (lastTxId != HdfsConstants.INVALID_TXID && lastTxId >= firstTxId)); checkTxIds &= ((firstTxId > -1) || (firstTxId == HdfsConstants.INVALID_TXID)); if (!checkTxIds) throw new IllegalArgumentException("Illegal transaction ids: " + firstTxId + ", " + lastTxId + " in progress: " + isInProgress); if(file == null) throw new IllegalArgumentException("File can not be NULL"); this.firstTxId = firstTxId; this.lastTxId = lastTxId; this.file = file; this.isInProgress = isInProgress; }
/** * Scan the local storage directory, and return the segment containing * the highest transaction. * @return the EditLogFile with the highest transactions, or null * if no files exist. */ private synchronized EditLogFile scanStorageForLatestEdits() throws IOException { if (!fjm.getStorageDirectory().getCurrentDir().exists()) { return null; }"Scanning storage " + fjm); List<EditLogFile> files = fjm.getLogFiles(0); while (!files.isEmpty()) { EditLogFile latestLog = files.remove(files.size() - 1); latestLog.validateLog();"Latest log is " + latestLog); if (latestLog.getLastTxId() == HdfsConstants.INVALID_TXID) { // the log contains no transactions LOG.warn("Latest log " + latestLog + " has no transactions. " + "moving it aside and looking for previous log"); latestLog.moveAsideEmptyFile(); } else { return latestLog; } }"No files in " + fjm); return null; }
/** * @return the current state of the given segment, or null if the * segment does not exist. */ private SegmentStateProto getSegmentInfo(long segmentTxId) throws IOException { EditLogFile elf = fjm.getLogFile(segmentTxId); if (elf == null) { return null; } if (elf.isInProgress()) { elf.validateLog(); } if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) {"Edit log file " + elf + " appears to be empty. " + "Moving it aside..."); elf.moveAsideEmptyFile(); return null; } SegmentStateProto ret = new SegmentStateProto(segmentTxId, elf.getLastTxId(), elf.isInProgress());"getSegmentInfo(" + segmentTxId + "): " + elf + " -> " + ret); return ret; }
/** * try to access a block on a data node. If fails - throws exception * @param datanode * @param lblock * @throws IOException */ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock) throws IOException { InetSocketAddress targetAddr = null; Socket s = null; DFSClient.BlockReader blockReader = null; Block block = lblock.getBlock(); targetAddr = NetUtils.createSocketAddr(datanode.getName()); s = new Socket(); s.connect(targetAddr, HdfsConstants.READ_TIMEOUT); s.setSoTimeout(HdfsConstants.READ_TIMEOUT); blockReader = DFSClient.BlockReader.newBlockReader(s, targetAddr.toString() + ":" + block.getBlockId(), block.getBlockId(), lblock.getBlockToken(), block.getGenerationStamp(), 0, -1, 4096); // nothing - if it fails - it will throw and exception }
private boolean replaceBlock( Block block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr( destination.getName()), HdfsConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK); out.writeLong(block.getBlockId()); out.writeLong(block.getGenerationStamp()); Text.writeString(out, source.getStorageID()); sourceProxy.write(out); BlockTokenSecretManager.DUMMY_TOKEN.write(out); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); short status = reply.readShort(); if(status == DataTransferProtocol.OP_STATUS_SUCCESS) { return true; } return false; }
/** * Mocks FSNamesystem instance, adds an empty file, sets status of last two * blocks to non-defined and UNDER_CONSTRUCTION and invokes lease recovery * method. IOException is expected for releasing a create lock on a * closed file. * @throws IOException as the result */ @Test(expected=IOException.class) public void testInternalReleaseLease_UNKNOWN_COMM () throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } LeaseManager.Lease lm = mock(LeaseManager.Lease.class); Path file = spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat")); DatanodeDescriptor dnd = mock(DatanodeDescriptor.class); PermissionStatus ps = new PermissionStatus("test", "test", new FsPermission((short)0777)); mockFileBlocks(2, null, HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, false); fsn.internalReleaseLease(lm, file.toString(), null); assertTrue("FSNamesystem.internalReleaseLease suppose to throw " + "IOException here", false); }
/** * Mocks FSNamesystem instance, adds an empty file, sets status of last two * blocks to COMMITTED and COMMITTED and invokes lease recovery * method. AlreadyBeingCreatedException is expected. * @throws AlreadyBeingCreatedException as the result */ @Test(expected=AlreadyBeingCreatedException.class) public void testInternalReleaseLease_COMM_COMM () throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } LeaseManager.Lease lm = mock(LeaseManager.Lease.class); Path file = spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat")); DatanodeDescriptor dnd = mock(DatanodeDescriptor.class); PermissionStatus ps = new PermissionStatus("test", "test", new FsPermission((short)0777)); mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED, HdfsConstants.BlockUCState.COMMITTED, file, dnd, ps, false); fsn.internalReleaseLease(lm, file.toString(), null); assertTrue("FSNamesystem.internalReleaseLease suppose to throw " + "AlreadyBeingCreatedException here", false); }
/** * Mocks FSNamesystem instance, adds an empty file with 1 block * and invokes lease recovery method. * AlreadyBeingCreatedException is expected. * @throws AlreadyBeingCreatedException as the result */ @Test(expected=AlreadyBeingCreatedException.class) public void testInternalReleaseLease_1blocks () throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } LeaseManager.Lease lm = mock(LeaseManager.Lease.class); Path file = spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat")); DatanodeDescriptor dnd = mock(DatanodeDescriptor.class); PermissionStatus ps = new PermissionStatus("test", "test", new FsPermission((short)0777)); mockFileBlocks(1, null, HdfsConstants.BlockUCState.COMMITTED, file, dnd, ps, false); fsn.internalReleaseLease(lm, file.toString(), null); assertTrue("FSNamesystem.internalReleaseLease suppose to throw " + "AlreadyBeingCreatedException here", false); }
/** * Mocks FSNamesystem instance, adds an empty file, sets status of last two * blocks to COMMITTED and UNDER_CONSTRUCTION and invokes lease recovery * method. <code>false</code> is expected as the result * @throws IOException in case of an error */ @Test public void testInternalReleaseLease_COMM_CONSTRUCTION () throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } LeaseManager.Lease lm = mock(LeaseManager.Lease.class); Path file = spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat")); DatanodeDescriptor dnd = mock(DatanodeDescriptor.class); PermissionStatus ps = new PermissionStatus("test", "test", new FsPermission((short)0777)); mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED, HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, false); assertFalse("False is expected in return in this case", fsn.internalReleaseLease(lm, file.toString(), null)); }
@Test public void testCommitBlockSynchronization_BlockNotFound () throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } long recoveryId = 2002; long newSize = 273487234; Path file = spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat")); DatanodeDescriptor dnd = mock(DatanodeDescriptor.class); PermissionStatus ps = new PermissionStatus("test", "test", new FsPermission((short)0777)); mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED, HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, false); BlockInfo lastBlock = fsn.dir.getFileINode(anyString()).getLastBlock(); try { fsn.commitBlockSynchronization(lastBlock, recoveryId, newSize, true, false, new DatanodeID[1]); } catch (IOException ioe) { assertTrue(ioe.getMessage().startsWith("Block (=")); } }
/** * try to access a block on a data node. If fails - throws exception * @param datanode * @param lblock * @throws IOException */ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock) throws IOException { InetSocketAddress targetAddr = null; Socket s = null; BlockReader blockReader = null; Block block = lblock.getBlock(); targetAddr = NetUtils.createSocketAddr(datanode.getName()); s = new Socket(); s.connect(targetAddr, HdfsConstants.READ_TIMEOUT); s.setSoTimeout(HdfsConstants.READ_TIMEOUT); String file = BlockReader.getFileName(targetAddr, block.getBlockId()); blockReader = BlockReader.newBlockReader(s, file, block, lblock .getBlockToken(), 0, -1, 4096); // nothing - if it fails - it will throw and exception }
private boolean replaceBlock( Block block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr( destination.getName()), HdfsConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); REPLACE_BLOCK.write(out); out.writeLong(block.getBlockId()); out.writeLong(block.getGenerationStamp()); Text.writeString(out, source.getStorageID()); sourceProxy.write(out); BlockTokenSecretManager.DUMMY_TOKEN.write(out); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); return == SUCCESS; }
/** * Get a BlockReader for the given block. */ public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead) throws IOException { InetSocketAddress targetAddr = null; Socket sock = null; Block block = testBlock.getBlock(); DatanodeInfo[] nodes = testBlock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getName()); sock = new Socket(); sock.connect(targetAddr, HdfsConstants.READ_TIMEOUT); sock.setSoTimeout(HdfsConstants.READ_TIMEOUT); return BlockReader.newBlockReader( sock, targetAddr.toString()+ ":" + block.getBlockId(), block, testBlock.getBlockToken(), offset, lenToRead, conf.getInt("io.file.buffer.size", 4096)); }
static FSEditLogLoader.EditLogValidation validateEditLog(File file) throws IOException { EditLogFileInputStream in; try { in = new EditLogFileInputStream(file); } catch (LogHeaderCorruptException corrupt) { // If it's missing its header, this is equivalent to no transactions FSImage.LOG.warn("Log at " + file + " has no valid header", corrupt); return new FSEditLogLoader.EditLogValidation(0, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID); } try { return FSEditLogLoader.validateEditLog(in); } finally { IOUtils.closeStream(in); } }
private boolean replaceBlock( Block block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr( destination.getName()), HdfsConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK); out.writeLong(block.getBlockId()); out.writeLong(block.getGenerationStamp()); Text.writeString(out, source.getStorageID()); sourceProxy.write(out); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); short status = reply.readShort(); if(status == DataTransferProtocol.OP_STATUS_SUCCESS) { return true; } return false; }
/** {@inheritDoc} */ public void doGet(HttpServletRequest request, HttpServletResponse response ) throws ServletException, IOException { final UnixUserGroupInformation ugi = getUGI(request); final PrintWriter out = response.getWriter(); final String filename = getFilename(request, response); final XMLOutputter xml = new XMLOutputter(out, "UTF-8"); xml.declaration(); final Configuration conf = new Configuration(DataNode.getDataNode().getConf()); final int socketTimeout = conf.getInt("dfs.socket.timeout", HdfsConstants.READ_TIMEOUT); final SocketFactory socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); UnixUserGroupInformation.saveToConf(conf, UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi); final ClientProtocol nnproxy = DFSClient.createNamenode(conf); try { final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum( filename, nnproxy, socketFactory, socketTimeout); MD5MD5CRC32FileChecksum.write(xml, checksum); } catch(IOException ioe) { new RemoteException(ioe.getClass().getName(), ioe.getMessage() ).writeXml(filename, xml); } xml.endDocument(); }
@Test public void testValidateEmptyEditLog() throws IOException { File testDir = new File(TEST_DIR, "testValidateEmptyEditLog"); SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap(); File logFile = prepareUnfinalizedTestEditLog(testDir, 0, offsetToTxId); // Truncate the file so that there is nothing except the header truncateFile(logFile, 4); EditLogValidation validation = EditLogFileInputStream.validateEditLog(logFile); assertTrue(!validation.hasCorruptHeader()); assertEquals(HdfsConstants.INVALID_TXID, validation.getEndTxId()); }
SimulatedBlockInlineChecksumFileWriter(final OutputStream dataOut, Block block, int checksumType, int bytesPerChecksum) { super(new BlockDataFile(null, null) { @Override public Writer getWriter(int bufferSize) { return new BlockDataFile.Writer(dataOut, null, null); } }, checksumType, bytesPerChecksum, HdfsConstants.DEFAULT_PACKETSIZE); this.block = block; }
/** * Truncate the given block in place, such that the new truncated block * is still valid (ie checksums are updated to stay in sync with block file) */ public static void truncateBlock(DataNode dn, Block block, long newLength, int namespaceId, boolean useInlineChecksum) throws IOException { FSDataset ds = (FSDataset); ReplicaToRead rr = ds.getReplicaToRead(namespaceId, block); File blockFile = rr.getDataFileToRead(); if (blockFile == null) { throw new IOException("Can't find block file for block " + block + " on DN " + dn); } if (useInlineChecksum) { new BlockInlineChecksumWriter(ds.getReplicaToRead(namespaceId, block) .getBlockDataFile(), DataChecksum.CHECKSUM_CRC32, dn.conf.getInt( "io.bytes.per.checksum", 512), HdfsConstants.DEFAULT_PACKETSIZE) .truncateBlock(newLength); } else { File metaFile = BlockWithChecksumFileWriter.findMetaFile(blockFile); new BlockWithChecksumFileWriter(ds.getReplicaToRead(namespaceId, block) .getBlockDataFile(), metaFile).truncateBlock( blockFile.length(), newLength); } ((DatanodeBlockInfo) (ds.getReplicaToRead(namespaceId, block))) .syncInMemorySize(); }
public static void truncateBlockFile(File blockFile, long newLength, boolean useInlineChecksum, int bytesPerChecksum) throws IOException { if (useInlineChecksum) { new BlockInlineChecksumWriter(new BlockDataFile(blockFile, null), DataChecksum.CHECKSUM_CRC32, bytesPerChecksum, HdfsConstants.DEFAULT_PACKETSIZE).truncateBlock(newLength); } else { File metaFile = BlockWithChecksumFileWriter.findMetaFile(blockFile); new BlockWithChecksumFileWriter(new BlockDataFile(blockFile, null), metaFile).truncateBlock(blockFile.length(), newLength); } }
/** * Tests positional read in DFS, with quorum reads enabled. */ public void testQuorumPreadDFSBasic() throws IOException { Configuration conf = new Configuration(); conf.setInt(HdfsConstants.DFS_DFSCLIENT_QUORUM_READ_THREADPOOL_SIZE, 5); conf.setLong(HdfsConstants.DFS_DFSCLIENT_QUORUM_READ_THRESHOLD_MILLIS, 100); dfsPreadTest(conf, false); //normal pread dfsPreadTest(conf, true); //trigger read code path without transferTo. }
@Test public void testQuorumPReadWithOptions() throws Exception { Configuration newConf = new Configuration(conf); newConf.setBoolean("fs.hdfs.impl.disable.cache", true); newConf.setInt(HdfsConstants.DFS_DFSCLIENT_QUORUM_READ_THREADPOOL_SIZE, 10); runPreadTest(conf); }
@Override public RemoteEditLogManifest getEditLogManifest(long fromTxId) throws IOException { Collection<EditLogLedgerMetadata> ledgers = metadataManager.listLedgers(true);"Ledgers to include in manifest: " + ledgers); List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(ledgers.size()); for (EditLogLedgerMetadata ledger : ledgers) { long endTxId = ledger.getLastTxId(); boolean isInProgress = endTxId == -1; if (isInProgress) { endTxId = validateAndGetEndTxId(ledger); } if (endTxId == HdfsConstants.INVALID_TXID) { continue; } if (ledger.getFirstTxId() >= fromTxId) { ret.add(new RemoteEditLog(ledger.getFirstTxId(), endTxId, isInProgress)); } else if ((fromTxId > ledger.getFirstTxId()) && (fromTxId <= endTxId)) { throw new IOException("Asked for firstTxId " + fromTxId + " which is in the middle of ledger " + ledger); } } Collections.sort(ret); return new RemoteEditLogManifest(ret, false); }
@Test public void testLeaseAfterFailover() throws Exception { String fileName = "/testLeaseAfterFailover"; FSDataOutputStream out = fs.create(new Path(fileName)); byte[] buffer = new byte[1024]; random.nextBytes(buffer); out.write(buffer); out.sync(); FSNamesystem primary = cluster.getPrimaryAvatar(0).avatar.namesystem; // Prevents lease recovery to work. cluster.shutDownDataNodes(); // Expire the lease. primary.leaseManager.setLeasePeriod(0, 0); primary.leaseManager.checkLeases(); cluster.killPrimary(); cluster.restartDataNodes(false); AvatarNode standbyAvatar = cluster.getStandbyAvatar(0).avatar; cluster.failOver(); String lease = standbyAvatar.namesystem.leaseManager .getLeaseByPath(fileName).getHolder(); assertEquals(HdfsConstants.NN_RECOVERY_LEASEHOLDER, lease); }
public long getNumTransactions() { if (endTxId == HdfsConstants.INVALID_TXID || startTxId == HdfsConstants.INVALID_TXID) { return 0; } return (endTxId - startTxId) + 1; }