/** * Fence any previous writers, and obtain a unique epoch number * for write-access to the journal nodes. * * @return the new, unique epoch number */ Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch() throws IOException { Preconditions.checkState(!loggers.isEpochEstablished(), "epoch already created"); Map<AsyncLogger, GetJournalStateResponseProto> lastPromises = loggers.waitForWriteQuorum(loggers.getJournalState(), getJournalStateTimeoutMs, "getJournalState()"); long maxPromised = Long.MIN_VALUE; for (GetJournalStateResponseProto resp : lastPromises.values()) { maxPromised = Math.max(maxPromised, resp.getLastPromisedEpoch()); } assert maxPromised >= 0; long myEpoch = maxPromised + 1; Map<AsyncLogger, NewEpochResponseProto> resps = loggers.waitForWriteQuorum(loggers.newEpoch(nsInfo, myEpoch), newEpochTimeoutMs, "newEpoch(" + myEpoch + ")"); loggers.setEpoch(myEpoch); return resps; }
public QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch( NamespaceInfo nsInfo, long epoch) { Map<AsyncLogger, ListenableFuture<NewEpochResponseProto>> calls = Maps.newHashMap(); for (AsyncLogger logger : loggers) { calls.put(logger, logger.newEpoch(epoch)); } return QuorumCall.create(calls); }
@Override public ListenableFuture<NewEpochResponseProto> newEpoch( final long epoch) { return singleThreadExecutor.submit(new Callable<NewEpochResponseProto>() { @Override public NewEpochResponseProto call() throws IOException { return getProxy().newEpoch(journalId, nsInfo, epoch); } }); }
@Override public void recoverUnfinalizedSegments() throws IOException { Preconditions.checkState(!isActiveWriter, "already active writer"); LOG.info("Starting recovery process for unclosed journal segments..."); Map<AsyncLogger, NewEpochResponseProto> resps = createNewUniqueEpoch(); LOG.info("Successfully started new epoch " + loggers.getEpoch()); if (LOG.isDebugEnabled()) { LOG.debug("newEpoch(" + loggers.getEpoch() + ") responses:\n" + QuorumCall.mapToString(resps)); } long mostRecentSegmentTxId = Long.MIN_VALUE; for (NewEpochResponseProto r : resps.values()) { if (r.hasLastSegmentTxId()) { mostRecentSegmentTxId = Math.max(mostRecentSegmentTxId, r.getLastSegmentTxId()); } } // On a completely fresh system, none of the journals have any // segments, so there's nothing to recover. if (mostRecentSegmentTxId != Long.MIN_VALUE) { recoverUnclosedSegment(mostRecentSegmentTxId); } isActiveWriter = true; }
/** * Try to create a new epoch for this journal. * @param nsInfo the namespace, which is verified for consistency or used to * format, if the Journal has not yet been written to. * @param epoch the epoch to start * @return the status information necessary to begin recovery * @throws IOException if the node has already made a promise to another * writer with a higher epoch number, if the namespace is inconsistent, * or if a disk error occurs. */ synchronized NewEpochResponseProto newEpoch( NamespaceInfo nsInfo, long epoch) throws IOException { checkFormatted(); storage.checkConsistentNamespace(nsInfo); // Check that the new epoch being proposed is in fact newer than // any other that we've promised. if (epoch <= getLastPromisedEpoch()) { throw new IOException("Proposed epoch " + epoch + " <= last promise " + getLastPromisedEpoch()); } updateLastPromisedEpoch(epoch); abortCurSegment(); NewEpochResponseProto.Builder builder = NewEpochResponseProto.newBuilder(); EditLogFile latestFile = scanStorageForLatestEdits(); if (latestFile != null) { builder.setLastSegmentTxId(latestFile.getFirstTxId()); } return builder.build(); }
@Override public NewEpochResponseProto newEpoch(RpcController controller, NewEpochRequestProto request) throws ServiceException { try { return impl.newEpoch( request.getJid().getIdentifier(), PBHelper.convert(request.getNsInfo()), request.getEpoch()); } catch (IOException ioe) { throw new ServiceException(ioe); } }
@Override public NewEpochResponseProto newEpoch(String jid, NamespaceInfo nsInfo, long epoch) throws IOException { try { NewEpochRequestProto req = NewEpochRequestProto.newBuilder() .setJid(convertJournalId(jid)) .setNsInfo(PBHelper.convert(nsInfo)) .setEpoch(epoch) .build(); return rpcProxy.newEpoch(NULL_CONTROLLER, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Before public void setup() throws Exception { spyLoggers = ImmutableList.of( mockLogger(), mockLogger(), mockLogger()); qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO) { @Override protected List<AsyncLogger> createLoggers(AsyncLogger.Factory factory) { return spyLoggers; } }; for (AsyncLogger logger : spyLoggers) { futureReturns(GetJournalStateResponseProto.newBuilder() .setLastPromisedEpoch(0) .setHttpPort(-1) .build()) .when(logger).getJournalState(); futureReturns( NewEpochResponseProto.newBuilder().build() ).when(logger).newEpoch(Mockito.anyLong()); futureReturns(null).when(logger).format(Mockito.<NamespaceInfo>any()); } qjm.recoverUnfinalizedSegments(); }
@Test(timeout=100000) public void testReturnsSegmentInfoAtEpochTransition() throws Exception { ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get(); ch.sendEdits(1L, 1, 2, QJMTestUtil.createTxnData(1, 2)).get(); // Switch to a new epoch without closing earlier segment NewEpochResponseProto response = ch.newEpoch(2).get(); ch.setEpoch(2); assertEquals(1, response.getLastSegmentTxId()); ch.finalizeLogSegment(1, 2).get(); // Switch to a new epoch after just closing the earlier segment. response = ch.newEpoch(3).get(); ch.setEpoch(3); assertEquals(1, response.getLastSegmentTxId()); // Start a segment but don't write anything, check newEpoch segment info ch.startLogSegment(3, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get(); response = ch.newEpoch(4).get(); ch.setEpoch(4); // Because the new segment is empty, it is equivalent to not having // started writing it. Hence, we should return the prior segment txid. assertEquals(1, response.getLastSegmentTxId()); }
/** * Test that, if the writer crashes at the very beginning of a segment, * before any transactions are written, that the next newEpoch() call * returns the prior segment txid as its most recent segment. */ @Test (timeout = 10000) public void testNewEpochAtBeginningOfSegment() throws Exception { journal.newEpoch(FAKE_NSINFO, 1); journal.startLogSegment(makeRI(1), 1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); journal.journal(makeRI(2), 1, 1, 2, QJMTestUtil.createTxnData(1, 2)); journal.finalizeLogSegment(makeRI(3), 1, 2); journal.startLogSegment(makeRI(4), 3, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); NewEpochResponseProto resp = journal.newEpoch(FAKE_NSINFO, 2); assertEquals(1, resp.getLastSegmentTxId()); }