/** * Finalize the current log segment. * Transitions from IN_SEGMENT state to BETWEEN_LOG_SEGMENTS state. */ public synchronized void endCurrentLogSegment(boolean writeEndTxn) { LOG.info("Ending log segment " + curSegmentTxId); Preconditions.checkState(isSegmentOpen(), "Bad state: %s", state); if (writeEndTxn) { logEdit(LogSegmentOp.getInstance(cache.get(), FSEditLogOpCodes.OP_END_LOG_SEGMENT)); logSync(); } printStatistics(true); final long lastTxId = getLastWrittenTxId(); try { journalSet.finalizeLogSegment(curSegmentTxId, lastTxId); editLogStream = null; } catch (IOException e) { //All journals have failed, it will be handled in logSync. } state = State.BETWEEN_LOG_SEGMENTS; }
/** * Finalize the current log segment. * Transitions from IN_SEGMENT state to BETWEEN_LOG_SEGMENTS state. */ synchronized void endCurrentLogSegment(boolean writeEndTxn) { LOG.info("Ending log segment " + curSegmentTxId); Preconditions.checkState(isSegmentOpen(), "Bad state: %s", state); if (writeEndTxn) { logEdit(LogSegmentOp.getInstance(cache.get(), FSEditLogOpCodes.OP_END_LOG_SEGMENT)); logSync(); } printStatistics(true); final long lastTxId = getLastWrittenTxId(); try { journalSet.finalizeLogSegment(curSegmentTxId, lastTxId); editLogStream = null; } catch (IOException e) { //All journals have failed, it will be handled in logSync. } state = State.BETWEEN_LOG_SEGMENTS; }
/** * Start writing to the log segment with the given txid. * Transitions from BETWEEN_LOG_SEGMENTS state to IN_LOG_SEGMENT state. */ synchronized void startLogSegment(final long segmentTxId, boolean writeHeaderTxn) throws IOException { LOG.info("Starting log segment at " + segmentTxId); Preconditions.checkArgument(segmentTxId > 0, "Bad txid: %s", segmentTxId); Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS, "Bad state: %s", state); Preconditions.checkState(segmentTxId > curSegmentTxId, "Cannot start writing to log segment " + segmentTxId + " when previous log segment started at " + curSegmentTxId); Preconditions.checkArgument(segmentTxId == txid + 1, "Cannot start log segment at txid %s when next expected " + "txid is %s", segmentTxId, txid + 1); numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0; // TODO no need to link this back to storage anymore! // See HDFS-2174. storage.attemptRestoreRemovedStorage(); try { editLogStream = journalSet.startLogSegment(segmentTxId, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); } catch (IOException ex) { throw new IOException("Unable to start log segment " + segmentTxId + ": too few journals successfully started.", ex); } curSegmentTxId = segmentTxId; state = State.IN_SEGMENT; if (writeHeaderTxn) { logEdit(LogSegmentOp.getInstance(cache.get(), FSEditLogOpCodes.OP_START_LOG_SEGMENT)); logSync(); } }
synchronized void startLogSegmentAndWriteHeaderTxn(final long segmentTxId, int layoutVersion) throws IOException { startLogSegment(segmentTxId, layoutVersion); logEdit(LogSegmentOp.getInstance(cache.get(), FSEditLogOpCodes.OP_START_LOG_SEGMENT)); logSync(); }
/** * Start writing to the log segment with the given txid. * Transitions from BETWEEN_LOG_SEGMENTS state to IN_LOG_SEGMENT state. */ synchronized void startLogSegment(final long segmentTxId, boolean writeHeaderTxn) throws IOException { LOG.info("Starting log segment at " + segmentTxId); Preconditions.checkArgument(segmentTxId > 0, "Bad txid: %s", segmentTxId); Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS, "Bad state: %s", state); Preconditions.checkState(segmentTxId > curSegmentTxId, "Cannot start writing to log segment " + segmentTxId + " when previous log segment started at " + curSegmentTxId); Preconditions.checkArgument(segmentTxId == txid + 1, "Cannot start log segment at txid %s when next expected " + "txid is %s", segmentTxId, txid + 1); numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0; // TODO no need to link this back to storage anymore! // See HDFS-2174. storage.attemptRestoreRemovedStorage(); try { editLogStream = journalSet.startLogSegment(segmentTxId); } catch (IOException ex) { throw new IOException("Unable to start log segment " + segmentTxId + ": too few journals successfully started.", ex); } curSegmentTxId = segmentTxId; state = State.IN_SEGMENT; if (writeHeaderTxn) { logEdit(LogSegmentOp.getInstance(cache.get(), FSEditLogOpCodes.OP_START_LOG_SEGMENT)); logSync(); } }