/** * Construct BookKeeper edit log input stream. * Starts reading from firstBookKeeperEntry. This allows the stream * to take a shortcut during recovery, as it doesn't have to read * every edit log transaction to find out what the last one is. */ BookKeeperEditLogInputStream(LedgerHandle lh, EditLogLedgerMetadata metadata, long firstBookKeeperEntry) throws IOException { this.lh = lh; this.firstTxId = metadata.getFirstTxId(); this.lastTxId = metadata.getLastTxId(); this.logVersion = metadata.getDataLayoutVersion(); this.inProgress = metadata.isInProgress(); if (firstBookKeeperEntry < 0 || firstBookKeeperEntry > lh.getLastAddConfirmed()) { throw new IOException("Invalid first bk entry to read: " + firstBookKeeperEntry + ", LAC: " + lh.getLastAddConfirmed()); } BufferedInputStream bin = new BufferedInputStream( new LedgerInputStream(lh, firstBookKeeperEntry)); tracker = new FSEditLogLoader.PositionTrackingInputStream(bin); DataInputStream in = new DataInputStream(tracker); reader = new FSEditLogOp.Reader(in, tracker, logVersion); }
/** * Construct BookKeeper edit log input stream. * Starts reading from firstBookKeeperEntry. This allows the stream * to take a shortcut during recovery, as it doesn't have to read * every edit log transaction to find out what the last one is. */ BookKeeperEditLogInputStream(LedgerHandle lh, EditLogLedgerMetadata metadata, long firstBookKeeperEntry) throws IOException { this.lh = lh; this.firstTxId = metadata.getFirstTxId(); this.lastTxId = metadata.getLastTxId(); this.logVersion = metadata.getDataLayoutVersion(); this.inProgress = metadata.isInProgress(); if (firstBookKeeperEntry < 0 || firstBookKeeperEntry > lh.getLastAddConfirmed()) { throw new IOException("Invalid first bk entry to read: " + firstBookKeeperEntry + ", LAC: " + lh.getLastAddConfirmed()); } BufferedInputStream bin = new BufferedInputStream( new LedgerInputStream(lh, firstBookKeeperEntry)); tracker = new FSEditLogLoader.PositionTrackingInputStream(bin); DataInputStream in = new DataInputStream(tracker); reader = FSEditLogOp.Reader.create(in, tracker, logVersion); }
public static FSEditLogLoader.EditLogValidation validateEditLog( LedgerHandleProvider ledgerProvider, EditLogLedgerMetadata ledgerMetadata) throws IOException { BookKeeperEditLogInputStream in; try { in = new BookKeeperEditLogInputStream(ledgerProvider, ledgerMetadata.getLedgerId(), 0, ledgerMetadata.getFirstTxId(), ledgerMetadata.getLastTxId(), ledgerMetadata.getLastTxId() == -1); } catch (LedgerHeaderCorruptException e) { LOG.warn("Log at ledger id" + ledgerMetadata.getLedgerId() + " has no valid header", e); return new FSEditLogLoader.EditLogValidation(0, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, true); } try { return FSEditLogLoader.validateEditLog(in); } finally { IOUtils.closeStream(in); } }
long validateAndGetEndTxId(EditLogLedgerMetadata ledger, boolean fence) throws IOException { FSEditLogLoader.EditLogValidation val; if (!fence) { val = BookKeeperEditLogInputStream.validateEditLog(this, ledger); } else { val = BookKeeperEditLogInputStream.validateEditLog( new FencingLedgerHandleProvider(), ledger); } InjectionHandler.processEvent(InjectionEvent.BKJM_VALIDATELOGSEGMENT, val); if (val.getNumTransactions() == 0) { return HdfsConstants.INVALID_TXID; // Ledger is corrupt } return val.getEndTxId(); }
@Test public void testFailoverRetryStandbyQuiesce() throws Exception { setUp("testFailoverRetryBlocksMisMatch", false); int totalBlocks = 50; DFSTestUtil.createFile(fs, new Path("/testFailoverRetryBlocksMisMatch"), (long) totalBlocks * 1024, (short) 3, System.currentTimeMillis()); // This should shutdown the standby. cluster.getStandbyAvatar(0).avatar .quiesceStandby(FSEditLogLoader.TXID_IGNORE); while (!restartDone) { LOG.info("Waiting for restart.."); Thread.sleep(1000); } try { tryConnect(cluster.getStandbyAvatar(0).avatar); fail("Did not throw exception"); } catch (Exception e) { LOG.warn("Expected exception : ", e); } }
URLLogInputStream(AsyncLogger logger, long firstTxId, int httpTimeout) throws LogHeaderCorruptException, IOException { log = new URLLog(logger, firstTxId, httpTimeout); fStream = log.getInputStream(0); lastValidTxId = log.getLastValidTxId(); super.setIsInProgress(log.getIsInProgress()); BufferedInputStream bin = new BufferedInputStream(fStream); tracker = new FSEditLogLoader.PositionTrackingInputStream(bin); DataInputStream in = new DataInputStream(tracker); try { logVersion = readLogVersion(in); } catch (EOFException eofe) { throw new LogHeaderCorruptException("No header found in log"); } reader = new FSEditLogOp.Reader(in, logVersion); this.firstTxId = firstTxId; this.disabled = false; // set initial position (version is 4 bytes) this.currentPosition = tracker.getPos(); }
@Override protected void _processEvent(InjectionEventI event, Object... args) { if (event == InjectionEvent.BKJM_STARTLOGSEGMENT) { ledgerForStartedSegment = (EditLogLedgerMetadata) args[0]; } else if (event == InjectionEvent.BKJM_VALIDATELOGSEGMENT) { logValidation = (FSEditLogLoader.EditLogValidation) args[0]; } }
protected static TestAvatarCheckpointingHandler testQuiesceInterruption( InjectionEvent stopOnEvent, InjectionEvent waitUntilEvent, boolean scf, boolean testCancellation, boolean rollAfterQuiesce, boolean enableQJM) throws Exception { LOG.info("TEST Quiesce during checkpoint : " + stopOnEvent + " waiting on: " + waitUntilEvent); TestAvatarCheckpointingHandler h = new TestAvatarCheckpointingHandler( stopOnEvent, waitUntilEvent, scf); InjectionHandler.set(h); setUp(3, "testQuiesceInterruption", false, enableQJM); //simulate interruption, no ckpt failure AvatarNode primary = cluster.getPrimaryAvatar(0).avatar; AvatarNode standby = cluster.getStandbyAvatar(0).avatar; createEdits(40); while (!h.receivedEvents.contains(stopOnEvent)) { LOG.info("Waiting for event : " + stopOnEvent); Thread.sleep(1000); } if (!enableQJM) { standby.quiesceStandby(getCurrentTxId(primary)-1); // only assert this for FileJournalManager. // edits + SLS + ELS + SLS (checkpoint fails, but roll happened) assertEquals(43, getCurrentTxId(primary)); // if quiesce happened before roll, the standby will be behind by 1 transaction // which will be reclaimed by opening the log after long extraTransaction = rollAfterQuiesce ? 1 : 0; assertEquals(getCurrentTxId(primary), getCurrentTxId(standby) + extraTransaction); } else { standby.quiesceStandby(FSEditLogLoader.TXID_IGNORE); } // make sure the checkpoint indeed failed assertTrue(h.receivedEvents .contains(InjectionEvent.STANDBY_EXIT_CHECKPOINT_EXCEPTION)); if (testCancellation) { assertTrue(h.receivedEvents .contains(InjectionEvent.SAVE_NAMESPACE_CONTEXT_EXCEPTION)); } return h; }