/** * @return the current state of the given segment, or null if the * segment does not exist. */ @VisibleForTesting SegmentStateProto getSegmentInfo(long segmentTxId) throws IOException { EditLogFile elf = fjm.getLogFile(segmentTxId); if (elf == null) { return null; } if (elf.isInProgress()) { elf.scanLog(); } if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) { LOG.info("Edit log file " + elf + " appears to be empty. " + "Moving it aside..."); elf.moveAsideEmptyFile(); return null; } SegmentStateProto ret = SegmentStateProto.newBuilder() .setStartTxId(segmentTxId) .setEndTxId(elf.getLastTxId()) .setIsInProgress(elf.isInProgress()) .build(); LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " + TextFormat.shortDebugString(ret)); return ret; }
/** * Test whether JNs can correctly handle editlog that cannot be decoded. */ @Test public void testScanEditLog() throws Exception { // use a future layout version journal.startLogSegment(makeRI(1), 1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1); // in the segment we write garbage editlog, which can be scanned but // cannot be decoded final int numTxns = 5; byte[] ops = QJMTestUtil.createGabageTxns(1, 5); journal.journal(makeRI(2), 1, 1, numTxns, ops); // verify the in-progress editlog segment SegmentStateProto segmentState = journal.getSegmentInfo(1); assertTrue(segmentState.getIsInProgress()); Assert.assertEquals(numTxns, segmentState.getEndTxId()); Assert.assertEquals(1, segmentState.getStartTxId()); // finalize the segment and verify it again journal.finalizeLogSegment(makeRI(3), 1, numTxns); segmentState = journal.getSegmentInfo(1); assertFalse(segmentState.getIsInProgress()); Assert.assertEquals(numTxns, segmentState.getEndTxId()); Assert.assertEquals(1, segmentState.getStartTxId()); }
/** * @return the current state of the given segment, or null if the * segment does not exist. */ @VisibleForTesting SegmentStateProto getSegmentInfo(long segmentTxId) throws IOException { EditLogFile elf = fjm.getLogFile(segmentTxId); if (elf == null) { return null; } if (elf.isInProgress()) { elf.scanLog(Long.MAX_VALUE, false); } if (elf.getLastTxId() == HdfsServerConstants.INVALID_TXID) { LOG.info("Edit log file " + elf + " appears to be empty. " + "Moving it aside..."); elf.moveAsideEmptyFile(); return null; } SegmentStateProto ret = SegmentStateProto.newBuilder() .setStartTxId(segmentTxId) .setEndTxId(elf.getLastTxId()) .setIsInProgress(elf.isInProgress()) .build(); LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " + TextFormat.shortDebugString(ret)); return ret; }
/** * @return the current state of the given segment, or null if the * segment does not exist. */ private SegmentStateProto getSegmentInfo(long segmentTxId) throws IOException { EditLogFile elf = fjm.getLogFile(segmentTxId); if (elf == null) { return null; } if (elf.isInProgress()) { elf.validateLog(); } if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) { LOG.info("Edit log file " + elf + " appears to be empty. " + "Moving it aside..."); elf.moveAsideEmptyFile(); return null; } SegmentStateProto ret = new SegmentStateProto(segmentTxId, elf.getLastTxId(), elf.isInProgress()); LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " + ret); return ret; }
/** * @return the current state of the given segment, or null if the * segment does not exist. */ private SegmentStateProto getSegmentInfo(long segmentTxId) throws IOException { EditLogFile elf = fjm.getLogFile(segmentTxId); if (elf == null) { return null; } if (elf.isInProgress()) { elf.validateLog(); } if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) { LOG.info("Edit log file " + elf + " appears to be empty. " + "Moving it aside..."); elf.moveAsideEmptyFile(); return null; } SegmentStateProto ret = SegmentStateProto.newBuilder() .setStartTxId(segmentTxId) .setEndTxId(elf.getLastTxId()) .setIsInProgress(elf.isInProgress()) .build(); LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " + TextFormat.shortDebugString(ret)); return ret; }
QuorumCall<AsyncLogger,Void> acceptRecovery(SegmentStateProto log, URL fromURL) { Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap(); for (AsyncLogger logger : loggers) { ListenableFuture<Void> future = logger.acceptRecovery(log, fromURL); calls.put(logger, future); } return QuorumCall.create(calls); }
@Override public ListenableFuture<Void> acceptRecovery( final SegmentStateProto log, final URL url) { return singleThreadExecutor.submit(new Callable<Void>() { @Override public Void call() throws IOException { getProxy().acceptRecovery(createReqInfo(), log, url); return null; } }); }
/** * Synchronize a log segment from another JournalNode. The log is * downloaded from the provided URL into a temporary location on disk, * which is named based on the current request's epoch. * * @return the temporary location of the downloaded file */ private File syncLog(RequestInfo reqInfo, final SegmentStateProto segment, final URL url) throws IOException { final File tmpFile = storage.getSyncLogTemporaryFile( segment.getStartTxId(), reqInfo.getEpoch()); final List<File> localPaths = ImmutableList.of(tmpFile); LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + " from " + url); SecurityUtil.doAsLoginUser( new PrivilegedExceptionAction<Void>() { @Override public Void run() throws IOException { // We may have lost our ticket since last checkpoint, log in again, just in case if (UserGroupInformation.isSecurityEnabled()) { UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab(); } boolean success = false; try { TransferFsImage.doGetUrl(url, localPaths, storage, true); assert tmpFile.exists(); success = true; } finally { if (!success) { if (!tmpFile.delete()) { LOG.warn("Failed to delete temporary file " + tmpFile); } } } return null; } }); return tmpFile; }
@Override public void acceptRecovery(RequestInfo reqInfo, SegmentStateProto stateToAccept, URL fromUrl) throws IOException { try { rpcProxy.acceptRecovery(NULL_CONTROLLER, AcceptRecoveryRequestProto.newBuilder() .setReqInfo(convert(reqInfo)) .setStateToAccept(stateToAccept) .setFromURL(fromUrl.toExternalForm()) .build()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
QuorumCall<AsyncLogger,Void> acceptRecovery(SegmentStateProto log, String fromURL) { Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap(); for (AsyncLogger logger : loggers) { ListenableFuture<Void> future = logger.acceptRecovery(log, fromURL); calls.put(logger, future); } return QuorumCall.create(calls); }
@Override public ListenableFuture<Void> acceptRecovery( final SegmentStateProto log, final String url) { return executor.submit(new Callable<Void>() { @Override public Void call() throws IOException { getProxy().acceptRecovery(createReqInfo(), log, url); return null; } }); }
/** * @see QJournalProtocol#prepareRecovery(RequestInfo, long) */ public synchronized PrepareRecoveryResponseProto prepareRecovery( RequestInfo reqInfo, long segmentTxId) throws IOException { checkJournalStorageFormatted(); checkRequest(reqInfo); abortCurSegment(); PrepareRecoveryResponseProto ret = new PrepareRecoveryResponseProto(); PersistedRecoveryPaxosData previouslyAccepted = getPersistedPaxosData(segmentTxId); completeHalfDoneAcceptRecovery(previouslyAccepted); SegmentStateProto segInfo = getSegmentInfo(segmentTxId); boolean hasFinalizedSegment = segInfo != null && !segInfo.getIsInProgress(); if (previouslyAccepted != null && !hasFinalizedSegment) { ret.setAcceptedInEpoch(previouslyAccepted.getAcceptedInEpoch()); ret.setSegmentState(previouslyAccepted.getSegmentState()); } else { if (segInfo != null) { ret.setSegmentState(segInfo); } } ret.setLastWriterEpoch(lastWriterEpoch.get()); if (committedTxnId.get() != HdfsConstants.INVALID_TXID) { ret.setLastCommittedTxId(committedTxnId.get()); } LOG.info("Prepared recovery for segment " + segmentTxId + ": " + ret); return ret; }
/** * Synchronize a log segment from another JournalNode. The log is * downloaded from the provided URL into a temporary location on disk, * which is named based on the current request's epoch. * * @return the temporary location of the downloaded file */ File syncLog(RequestInfo reqInfo, final SegmentStateProto segment, final URL url) throws IOException { long startTxId = segment.getStartTxId(); long epoch = reqInfo.getEpoch(); return syncLog(epoch, segment.getStartTxId(), url, segment.toString(), journalStorage.getSyncLogTemporaryFile(startTxId, epoch)); }
@Override public void acceptRecovery(RequestInfo reqInfo, SegmentStateProto log, String fromUrl) throws IOException { jn.getOrCreateJournal(reqInfo.getJournalId()) .acceptRecovery(reqInfo, log, new URL(fromUrl)); }
@Override public ListenableFuture<Void> acceptRecovery( final SegmentStateProto log, final URL url) { return executor.submit(new Callable<Void>() { @Override public Void call() throws IOException { getProxy().acceptRecovery(createReqInfo(), log, url); return null; } }); }