/** * Refresh, preferably to a known position * @see EditLogInputStream#refresh(long) * @see BookKeeperJournalInputStream#position(long) */ @Override public void refresh(long position, long skippedUntilTxid) throws IOException { checkInitialized(); if (isInProgress()) { // If a ledger is in progress, re-open it for reading in order // to determine the correct bounds of the ledger. LedgerHandle ledger = ledgerProvider.openForReading(ledgerId); journalInputStream.resetLedger(ledger); } // Try to set the underlying stream to the specified position journalInputStream.position(position); // Reload the position tracker and log reader to adjust to the newly // refreshed position bin = new BufferedInputStream(journalInputStream); tracker = new PositionTrackingInputStream(bin, position); DataInputStream in = new DataInputStream(tracker); if (position == 0) { // If we are at the beginning, re-read the version logVersion = readLogVersion(in); } reader = new Reader(in, logVersion); }
public void init() throws IOException { LedgerHandle ledger = ledgerProvider.openForReading(ledgerId); if (!isInProgress() && firstBookKeeperEntry > ledger.getLastAddConfirmed()) { // ledger.getLastAddConfirmed() returns the last quorum-acknowledged entry // id in the ledger. Unless the segment is in-progress, we should throw // an exception if this last entry is lower than the first expected entry id. throw new IllegalArgumentException( "Invalid first BookKeeper entry to read: " + firstBookKeeperEntry + " > last confirmed entry id(" + ledger.getLastAddConfirmed() + ")"); } journalInputStream = new BookKeeperJournalInputStream(ledger, firstBookKeeperEntry); bin = new BufferedInputStream(journalInputStream); tracker = new PositionTrackingInputStream(bin, 0); DataInputStream in = new DataInputStream(tracker); try { logVersion = readLogVersion(in); } catch (EOFException e) { throw new LedgerHeaderCorruptException("No header file in the ledger"); } reader = new Reader(in, logVersion); LOG.info("Reading from ledger id " + ledgerId + ", starting with book keeper entry id " + firstBookKeeperEntry + ", log version " + logVersion + ", first txn id " + firstTxId + (isInProgress() ? ", in-progress log segment" : (" last txn id " + lastTxId)) + "."); }