static EditLogInputStream getJournalInputStreamDontCheckLastTxId( JournalManager jm, long txId) throws IOException { List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>(); jm.selectInputStreams(streams, txId, true, false); if (streams.size() < 1) { throw new IOException("Cannot obtain stream for txid: " + txId); } Collections.sort(streams, JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); if (txId == HdfsConstants.INVALID_TXID) { return streams.get(0); } for (EditLogInputStream elis : streams) { if (elis.getFirstTxId() == txId) { return elis; } } throw new IOException("Cannot obtain stream for txid: " + txId); }
@Test public void testSBNCheckpoints() throws Exception { JournalSet standbyJournalSet = NameNodeAdapter.spyOnJournalSet(nn1); doEdits(0, 10); HATestUtil.waitForStandbyToCatchUp(nn0, nn1); // Once the standby catches up, it should notice that it needs to // do a checkpoint and save one to its local directories. HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12)); // It should also upload it back to the active. HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12)); // The standby should never try to purge edit logs on shared storage. Mockito.verify(standbyJournalSet, Mockito.never()). purgeLogsOlderThan(Mockito.anyLong()); }
@Override public void selectInputStreams(Collection<EditLogInputStream> streams, long fromTxnId, boolean inProgressOk) throws IOException { QuorumCall<AsyncLogger, RemoteEditLogManifest> q = loggers.getEditLogManifest(fromTxnId, inProgressOk); Map<AsyncLogger, RemoteEditLogManifest> resps = loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs, "selectInputStreams"); LOG.debug("selectInputStream manifests:\n" + Joiner.on("\n").withKeyValueSeparator(": ").join(resps)); final PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<EditLogInputStream>(64, JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) { AsyncLogger logger = e.getKey(); RemoteEditLogManifest manifest = e.getValue(); for (RemoteEditLog remoteLog : manifest.getLogs()) { URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId()); EditLogInputStream elis = EditLogFileInputStream.fromUrl( connectionFactory, url, remoteLog.getStartTxId(), remoteLog.getEndTxId(), remoteLog.isInProgress()); allStreams.add(elis); } } JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId); }
/** * Setup the input stream to be consumed by the reader. The input stream * corresponds to a single segment. */ private void setupCurrentEditStream(long txid) throws IOException { // get new stream currentEditLogInputStream = JournalSet.getInputStream(remoteJournalManager, txid); // we just started a new log segment currentSegmentTxId = txid; // indicate that we successfully reopened the stream mostRecentlyReadTransactionTime = now(); }
/** * Select input streams. * inProgressOk should be true only for tailing, not for startup */ @Override public void selectInputStreams(Collection<EditLogInputStream> streams, long fromTxnId, boolean inProgressOk, boolean validateInProgressSegments) throws IOException { QuorumCall<AsyncLogger, RemoteEditLogManifest> q = loggers.getEditLogManifest(fromTxnId); // we insist on getting all responses, even if they are to be exceptions // this will fail if we cannot get majority of successes Map<AsyncLogger, RemoteEditLogManifest> resps = loggers .waitForReadQuorumWithAllResponses(q, selectInputStreamsTimeoutMs, "selectInputStreams"); if(LOG.isDebugEnabled()) { LOG.debug("selectInputStream manifests:\n" + Joiner.on("\n").withKeyValueSeparator(": ").join(resps)); } final PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<EditLogInputStream>(64, JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) { AsyncLogger logger = e.getKey(); RemoteEditLogManifest manifest = e.getValue(); for (RemoteEditLog remoteLog : manifest.getLogs()) { EditLogInputStream elis = new URLLogInputStream(logger, remoteLog.getStartTxId(), httpConnectReadTimeoutMs); if (elis.isInProgress() && !inProgressOk) { continue; } allStreams.add(elis); } } // we pass 0 as min redundance as we do not care about this here JournalSet.chainAndMakeRedundantStreams( streams, allStreams, fromTxnId, inProgressOk, 0); }
@Override public void selectInputStreams(Collection<EditLogInputStream> streams, long fromTxnId, boolean inProgressOk, boolean forReading) throws IOException { QuorumCall<AsyncLogger, RemoteEditLogManifest> q = loggers.getEditLogManifest(fromTxnId, forReading); Map<AsyncLogger, RemoteEditLogManifest> resps = loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs, "selectInputStreams"); LOG.debug("selectInputStream manifests:\n" + Joiner.on("\n").withKeyValueSeparator(": ").join(resps)); final PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<EditLogInputStream>(64, JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) { AsyncLogger logger = e.getKey(); RemoteEditLogManifest manifest = e.getValue(); for (RemoteEditLog remoteLog : manifest.getLogs()) { URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId()); EditLogInputStream elis = EditLogFileInputStream.fromUrl( url, remoteLog.getStartTxId(), remoteLog.getEndTxId(), remoteLog.isInProgress()); allStreams.add(elis); } } JournalSet.chainAndMakeRedundantStreams( streams, allStreams, fromTxnId, inProgressOk); }
@Override public void selectInputStreams(Collection<EditLogInputStream> streams, long fromTxnId, boolean inProgressOk, boolean forReading) throws IOException { QuorumCall<AsyncLogger, RemoteEditLogManifest> q = loggers.getEditLogManifest(fromTxnId, forReading, inProgressOk); Map<AsyncLogger, RemoteEditLogManifest> resps = loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs, "selectInputStreams"); LOG.debug("selectInputStream manifests:\n" + Joiner.on("\n").withKeyValueSeparator(": ").join(resps)); final PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<EditLogInputStream>(64, JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) { AsyncLogger logger = e.getKey(); RemoteEditLogManifest manifest = e.getValue(); for (RemoteEditLog remoteLog : manifest.getLogs()) { URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId()); EditLogInputStream elis = EditLogFileInputStream.fromUrl( url, remoteLog.getStartTxId(), remoteLog.getEndTxId(), remoteLog.isInProgress()); allStreams.add(elis); } } JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId); }