private void throwIfOutOfSync() throws JournalOutOfSyncException, IOException { if (isOutOfSync()) { // Even if we're out of sync, it's useful to send an RPC // to the remote node in order to update its lag metrics, etc. heartbeatIfNecessary(); throw new JournalOutOfSyncException( "Journal disabled until next roll"); } }
/** * @throws JournalOutOfSyncException if the given expression is not true. * The message of the exception is formatted using the 'msg' and * 'formatArgs' parameters. */ private void checkSync(boolean expression, String msg, Object... formatArgs) throws JournalOutOfSyncException { if (!expression) { throw new JournalOutOfSyncException(String.format(msg, formatArgs)); } }
/** * Ensure that finalizing a segment which doesn't exist throws the * appropriate exception. */ @Test (timeout = 10000) public void testFinalizeMissingSegment() throws Exception { journal.newEpoch(FAKE_NSINFO, 1); try { journal.finalizeLogSegment(makeRI(1), 1000, 1001); fail("did not fail to finalize"); } catch (JournalOutOfSyncException e) { GenericTestUtils.assertExceptionContains( "No log file to finalize at transaction ID 1000", e); } }
/** * Ensure that finalizing a segment which doesn't exist throws the appropriate * exception. */ @Test public void testFinalizeMissingSegment() throws Exception { journal.newEpoch(FAKE_NSINFO, 1); try { journal.finalizeLogSegment(makeRI(1), 1000, 1001); fail("did not fail to finalize"); } catch (JournalOutOfSyncException e) { GenericTestUtils.assertExceptionContains( "No log file to finalize at transaction ID 1000", e); } }
private void throwIfOutOfSync() throws JournalOutOfSyncException, IOException { if (isOutOfSync()) { // Even if we're out of sync, it's useful to send an RPC // to the remote node in order to update its lag metrics, etc. heartbeatIfNecessary(); throw new JournalOutOfSyncException( "Journal disabled until next roll"); } metrics.setOutOfSync(outOfSync); }
/** * Write a batch of edits to the journal. * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])} */ synchronized void journal(RequestInfo reqInfo, long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { checkFormatted(); checkWriteRequest(reqInfo); checkSync(curSegment != null, "Can't write, no segment open"); if (curSegmentTxId != segmentTxId) { // Sanity check: it is possible that the writer will fail IPCs // on both the finalize() and then the start() of the next segment. // This could cause us to continue writing to an old segment // instead of rolling to a new one, which breaks one of the // invariants in the design. If it happens, abort the segment // and throw an exception. JournalOutOfSyncException e = new JournalOutOfSyncException( "Writer out of sync: it thinks it is writing segment " + segmentTxId + " but current segment is " + curSegmentTxId); abortCurSegment(); throw e; } checkSync(nextTxId == firstTxnId, "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId); long lastTxnId = firstTxnId + numTxns - 1; if (LOG.isTraceEnabled()) { LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId); } // If the edit has already been marked as committed, we know // it has been fsynced on a quorum of other nodes, and we are // "catching up" with the rest. Hence we do not need to fsync. boolean isLagging = lastTxnId <= committedTxnId.get(); boolean shouldFsync = !isLagging; curSegment.writeRaw(records, 0, records.length); curSegment.setReadyToFlush(); StopWatch sw = new StopWatch(); sw.start(); curSegment.flush(shouldFsync); sw.stop(); long nanoSeconds = sw.now(); metrics.addSync( TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS)); long milliSeconds = TimeUnit.MILLISECONDS.convert( nanoSeconds, TimeUnit.NANOSECONDS); if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) { LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId + " took " + milliSeconds + "ms"); } if (isLagging) { // This batch of edits has already been committed on a quorum of other // nodes. So, we are in "catch up" mode. This gets its own metric. metrics.batchesWrittenWhileLagging.incr(1); } metrics.batchesWritten.incr(1); metrics.bytesWritten.incr(records.length); metrics.txnsWritten.incr(numTxns); highestWrittenTxId = lastTxnId; nextTxId = lastTxnId + 1; }
/** * Finalize the log segment at the given transaction ID. */ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId, long endTxId) throws IOException { checkFormatted(); checkRequest(reqInfo); boolean needsValidation = true; // Finalizing the log that the writer was just writing. if (startTxId == curSegmentTxId) { if (curSegment != null) { curSegment.close(); curSegment = null; curSegmentTxId = HdfsConstants.INVALID_TXID; } checkSync(nextTxId == endTxId + 1, "Trying to finalize in-progress log segment %s to end at " + "txid %s but only written up to txid %s", startTxId, endTxId, nextTxId - 1); // No need to validate the edit log if the client is finalizing // the log segment that it was just writing to. needsValidation = false; } FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId); if (elf == null) { throw new JournalOutOfSyncException("No log file to finalize at " + "transaction ID " + startTxId); } if (elf.isInProgress()) { if (needsValidation) { LOG.info("Validating log segment " + elf.getFile() + " about to be " + "finalized"); elf.scanLog(); checkSync(elf.getLastTxId() == endTxId, "Trying to finalize in-progress log segment %s to end at " + "txid %s but log %s on disk only contains up to txid %s", startTxId, endTxId, elf.getFile(), elf.getLastTxId()); } fjm.finalizeLogSegment(startTxId, endTxId); } else { Preconditions.checkArgument(endTxId == elf.getLastTxId(), "Trying to re-finalize already finalized log " + elf + " with different endTxId " + endTxId); } // Once logs are finalized, a different length will never be decided. // During recovery, we treat a finalized segment the same as an accepted // recovery. Thus, we no longer need to keep track of the previously- // accepted decision. The existence of the finalized log segment is enough. purgePaxosDecision(elf.getFirstTxId()); }
/** * Write a batch of edits to the journal. * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])} */ synchronized void journal(RequestInfo reqInfo, long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { checkFormatted(); checkWriteRequest(reqInfo); checkSync(curSegment != null, "Can't write, no segment open"); if (curSegmentTxId != segmentTxId) { // Sanity check: it is possible that the writer will fail IPCs // on both the finalize() and then the start() of the next segment. // This could cause us to continue writing to an old segment // instead of rolling to a new one, which breaks one of the // invariants in the design. If it happens, abort the segment // and throw an exception. JournalOutOfSyncException e = new JournalOutOfSyncException( "Writer out of sync: it thinks it is writing segment " + segmentTxId + " but current segment is " + curSegmentTxId); abortCurSegment(); throw e; } checkSync(nextTxId == firstTxnId, "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId); long lastTxnId = firstTxnId + numTxns - 1; if (LOG.isTraceEnabled()) { LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId); } // If the edit has already been marked as committed, we know // it has been fsynced on a quorum of other nodes, and we are // "catching up" with the rest. Hence we do not need to fsync. boolean isLagging = lastTxnId <= committedTxnId.get(); boolean shouldFsync = !isLagging; curSegment.writeRaw(records, 0, records.length); curSegment.setReadyToFlush(); StopWatch sw = new StopWatch(); sw.start(); curSegment.flush(shouldFsync); sw.stop(); long nanoSeconds = sw.now(); metrics.addSync( TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS)); long milliSeconds = TimeUnit.MILLISECONDS.convert( nanoSeconds, TimeUnit.NANOSECONDS); if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) { LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId + " took " + milliSeconds + "ms"); } if (isLagging) { // This batch of edits has already been committed on a quorum of other // nodes. So, we are in "catch up" mode. This gets its own metric. metrics.batchesWrittenWhileLagging.incr(1); } metrics.batchesWritten.incr(1); metrics.bytesWritten.incr(records.length); metrics.txnsWritten.incr(numTxns); updateHighestWrittenTxId(lastTxnId); nextTxId = lastTxnId + 1; lastJournalTimestamp = Time.now(); }
/** * Finalize the log segment at the given transaction ID. */ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId, long endTxId) throws IOException { checkFormatted(); checkRequest(reqInfo); boolean needsValidation = true; // Finalizing the log that the writer was just writing. if (startTxId == curSegmentTxId) { if (curSegment != null) { curSegment.close(); curSegment = null; curSegmentTxId = HdfsServerConstants.INVALID_TXID; } checkSync(nextTxId == endTxId + 1, "Trying to finalize in-progress log segment %s to end at " + "txid %s but only written up to txid %s", startTxId, endTxId, nextTxId - 1); // No need to validate the edit log if the client is finalizing // the log segment that it was just writing to. needsValidation = false; } FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId); if (elf == null) { throw new JournalOutOfSyncException("No log file to finalize at " + "transaction ID " + startTxId); } if (elf.isInProgress()) { if (needsValidation) { LOG.info("Validating log segment " + elf.getFile() + " about to be " + "finalized"); elf.scanLog(Long.MAX_VALUE, false); checkSync(elf.getLastTxId() == endTxId, "Trying to finalize in-progress log segment %s to end at " + "txid %s but log %s on disk only contains up to txid %s", startTxId, endTxId, elf.getFile(), elf.getLastTxId()); } fjm.finalizeLogSegment(startTxId, endTxId); } else { Preconditions.checkArgument(endTxId == elf.getLastTxId(), "Trying to re-finalize already finalized log " + elf + " with different endTxId " + endTxId); } // Once logs are finalized, a different length will never be decided. // During recovery, we treat a finalized segment the same as an accepted // recovery. Thus, we no longer need to keep track of the previously- // accepted decision. The existence of the finalized log segment is enough. purgePaxosDecision(elf.getFirstTxId()); }
/** * Write a batch of edits to the journal. * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])} */ synchronized void journal(RequestInfo reqInfo, long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { checkFormatted(); checkWriteRequest(reqInfo); checkSync(curSegment != null, "Can't write, no segment open"); if (curSegmentTxId != segmentTxId) { // Sanity check: it is possible that the writer will fail IPCs // on both the finalize() and then the start() of the next segment. // This could cause us to continue writing to an old segment // instead of rolling to a new one, which breaks one of the // invariants in the design. If it happens, abort the segment // and throw an exception. JournalOutOfSyncException e = new JournalOutOfSyncException( "Writer out of sync: it thinks it is writing segment " + segmentTxId + " but current segment is " + curSegmentTxId); abortCurSegment(); throw e; } checkSync(nextTxId == firstTxnId, "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId); long lastTxnId = firstTxnId + numTxns - 1; if (LOG.isTraceEnabled()) { LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId); } // If the edit has already been marked as committed, we know // it has been fsynced on a quorum of other nodes, and we are // "catching up" with the rest. Hence we do not need to fsync. boolean isLagging = lastTxnId <= committedTxnId.get(); boolean shouldFsync = !isLagging; curSegment.writeRaw(records, 0, records.length); curSegment.setReadyToFlush(); Stopwatch sw = new Stopwatch(); sw.start(); curSegment.flush(shouldFsync); sw.stop(); metrics.addSync(sw.elapsedTime(TimeUnit.MICROSECONDS)); if (sw.elapsedTime(TimeUnit.MILLISECONDS) > WARN_SYNC_MILLIS_THRESHOLD) { LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId + " took " + sw.elapsedTime(TimeUnit.MILLISECONDS) + "ms"); } if (isLagging) { // This batch of edits has already been committed on a quorum of other // nodes. So, we are in "catch up" mode. This gets its own metric. metrics.batchesWrittenWhileLagging.incr(1); } metrics.batchesWritten.incr(1); metrics.bytesWritten.incr(records.length); metrics.txnsWritten.incr(numTxns); highestWrittenTxId = lastTxnId; nextTxId = lastTxnId + 1; }
/** * Write a batch of edits to the journal. * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])} */ synchronized ShortVoid journal(RequestInfo reqInfo, long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { checkJournalStorageFormatted(); checkWriteRequest(reqInfo); if (curSegment == null) { checkSync(false, "Can't write, no segment open"); } if (curSegmentTxId != segmentTxId) { // Sanity check: it is possible that the writer will fail IPCs // on both the finalize() and then the start() of the next segment. // This could cause us to continue writing to an old segment // instead of rolling to a new one, which breaks one of the // invariants in the design. If it happens, abort the segment // and throw an exception. JournalOutOfSyncException e = new JournalOutOfSyncException( "Writer out of sync: it thinks it is writing segment " + segmentTxId + " but current segment is " + curSegmentTxId); abortCurSegment(); throw e; } if (nextTxId != firstTxnId) { checkSync(false, "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId); } long lastTxnId = firstTxnId + numTxns - 1; if (LOG.isTraceEnabled()) { LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId); } // If the edit has already been marked as committed, we know // it has been fsynced on a quorum of other nodes, and we are // "catching up" with the rest. Hence we do not need to fsync. boolean isLagging = lastTxnId <= committedTxnId.get(); boolean shouldFsync = !isLagging; curSegment.writeRaw(records, 0, records.length); curSegment.setReadyToFlush(); long start = System.nanoTime(); curSegment.flush(shouldFsync); long time = DFSUtil.getElapsedTimeMicroSeconds(start); currentSegmentWrittenBytes += records.length; metrics.addSync(time); if (time / 1000 > WARN_SYNC_MILLIS_THRESHOLD) { LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId + " took " + (time / 1000) + "ms"); } if (isLagging) { // This batch of edits has already been committed on a quorum of other // nodes. So, we are in "catch up" mode. This gets its own metric. metrics.batchesWrittenWhileLagging.inc(1); } metrics.batchesWritten.inc(1); metrics.bytesWritten.inc(records.length); metrics.txnsWritten.inc(numTxns); highestWrittenTxId = lastTxnId; metrics.setLastWrittenTxId(highestWrittenTxId); metrics.setCurrentTxnsLag(getCurrentLagTxns()); nextTxId = lastTxnId + 1; return ShortVoid.instance; }
/** * Finalize the log segment at the given transaction ID. */ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId, long endTxId) throws IOException { checkJournalStorageFormatted(); checkRequest(reqInfo); boolean needsValidation = true; // Finalizing the log that the writer was just writing. if (startTxId == curSegmentTxId) { if (curSegment != null) { curSegment.close(); curSegment = null; curSegmentTxId = HdfsConstants.INVALID_TXID; currentSegmentWrittenBytes = 0L; } checkSync(nextTxId == endTxId + 1, "Trying to finalize in-progress log segment %s to end at " + "txid %s but only written up to txid %s", startTxId, endTxId, nextTxId - 1); // No need to validate the edit log if the client is finalizing // the log segment that it was just writing to. needsValidation = false; } FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId); if (elf == null) { throw new JournalOutOfSyncException("No log file to finalize at " + "transaction ID " + startTxId); } if (elf.isInProgress()) { if (needsValidation) { LOG.info("Validating log segment " + elf.getFile() + " about to be " + "finalized"); elf.validateLog(); checkSync(elf.getLastTxId() == endTxId, "Trying to finalize in-progress log segment %s to end at " + "txid %s but log %s on disk only contains up to txid %s", startTxId, endTxId, elf.getFile(), elf.getLastTxId()); } fjm.finalizeLogSegment(startTxId, endTxId); } else { Preconditions.checkArgument(endTxId == elf.getLastTxId(), "Trying to re-finalize already finalized log " + elf + " with different endTxId " + endTxId); } // Once logs are finalized, a different length will never be decided. // During recovery, we treat a finalized segment the same as an accepted // recovery. Thus, we no longer need to keep track of the previously- // accepted decision. The existence of the finalized log segment is enough. purgePaxosDecision(elf.getFirstTxId()); }