private void checkRecovery(MiniJournalCluster cluster, long segmentTxId, long expectedEndTxId) throws IOException { int numFinalized = 0; for (int i = 0; i < cluster.getNumNodes(); i++) { File logDir = cluster.getCurrentDir(i, JID); EditLogFile elf = FileJournalManager.getLogFile(logDir, segmentTxId); if (elf == null) { continue; } if (!elf.isInProgress()) { numFinalized++; if (elf.getLastTxId() != expectedEndTxId) { fail("File " + elf + " finalized to wrong txid, expected " + expectedEndTxId); } } } if (numFinalized < cluster.getQuorumSize()) { fail("Did not find a quorum of finalized logs starting at " + segmentTxId); } }
private void checkRecovery(MiniJournalCluster cluster, long segmentTxId, long expectedEndTxId) throws IOException { int numFinalized = 0; for (int i = 0; i < cluster.getNumNodes(); i++) { File logDir = cluster.getJournalCurrentDir(i, JID); EditLogFile elf = FileJournalManager.getLogFile(logDir, segmentTxId); if (elf == null) { continue; } if (!elf.isInProgress()) { numFinalized++; if (elf.getLastTxId() != expectedEndTxId) { fail("File " + elf + " finalized to wrong txid, expected " + expectedEndTxId); } } } if (numFinalized < cluster.getQuorumSize()) { fail("Did not find a quorum of finalized logs starting at " + segmentTxId); } }
Journal(File logDir, File imageDir, String journalId, StorageErrorReporter errorReporter, JournalNode journalNode) throws IOException { this.journalNode = journalNode; // initialize storage directories journalStorage = new JNStorage(logDir, errorReporter, false, journalNode.getConf()); imageStorage = new JNStorage(imageDir, errorReporter, true, journalNode.getConf()); // initialize journal and image managers this.fjm = new FileJournalManager(journalStorage.getSingularStorageDir(), null, errorReporter); this.imageManager = new FileImageManager( imageStorage.getStorageDirectory(), imageStorage); this.journalId = journalId; this.metrics = JournalMetrics.create(this); refreshCachedData(); EditLogFile latest = scanStorageForLatestEdits(); if (latest != null) { highestWrittenTxId = latest.getLastTxId(); metrics.setLastWrittenTxId(highestWrittenTxId); } }
/** * @param conf Configuration object * @param logDir the path to the directory in which data will be stored * @param errorReporter a callback to report errors * @throws IOException */ protected JNStorage(Configuration conf, File logDir, StartupOption startOpt, StorageErrorReporter errorReporter) throws IOException { super(NodeType.JOURNAL_NODE); sd = new StorageDirectory(logDir); this.addStorageDir(sd); this.fjm = new FileJournalManager(conf, sd, errorReporter); analyzeAndRecoverStorage(startOpt); }
/** * @param conf Configuration object * @param logDir the path to the directory in which data will be stored * @param errorReporter a callback to report errors * @throws IOException */ protected JNStorage(Configuration conf, File logDir, StorageErrorReporter errorReporter) throws IOException { super(NodeType.JOURNAL_NODE); sd = new StorageDirectory(logDir); this.addStorageDir(sd); this.fjm = new FileJournalManager(conf, sd, errorReporter); analyzeStorage(); }
/** * Finalize the log segment at the given transaction ID. */ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId, long endTxId) throws IOException { checkFormatted(); checkRequest(reqInfo); boolean needsValidation = true; // Finalizing the log that the writer was just writing. if (startTxId == curSegmentTxId) { if (curSegment != null) { curSegment.close(); curSegment = null; curSegmentTxId = HdfsConstants.INVALID_TXID; } checkSync(nextTxId == endTxId + 1, "Trying to finalize in-progress log segment %s to end at " + "txid %s but only written up to txid %s", startTxId, endTxId, nextTxId - 1); // No need to validate the edit log if the client is finalizing // the log segment that it was just writing to. needsValidation = false; } FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId); if (elf == null) { throw new JournalOutOfSyncException("No log file to finalize at " + "transaction ID " + startTxId); } if (elf.isInProgress()) { if (needsValidation) { LOG.info("Validating log segment " + elf.getFile() + " about to be " + "finalized"); elf.scanLog(); checkSync(elf.getLastTxId() == endTxId, "Trying to finalize in-progress log segment %s to end at " + "txid %s but log %s on disk only contains up to txid %s", startTxId, endTxId, elf.getFile(), elf.getLastTxId()); } fjm.finalizeLogSegment(startTxId, endTxId); } else { Preconditions.checkArgument(endTxId == elf.getLastTxId(), "Trying to re-finalize already finalized log " + elf + " with different endTxId " + endTxId); } // Once logs are finalized, a different length will never be decided. // During recovery, we treat a finalized segment the same as an accepted // recovery. Thus, we no longer need to keep track of the previously- // accepted decision. The existence of the finalized log segment is enough. purgePaxosDecision(elf.getFirstTxId()); }
FileJournalManager getJournalManager() { return fjm; }
@Override public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { FileInputStream editFileIn = null; try { final ServletContext context = getServletContext(); final Configuration conf = (Configuration) getServletContext() .getAttribute(JspHelper.CURRENT_CONF); final String journalId = request.getParameter(JOURNAL_ID_PARAM); QuorumJournalManager.checkJournalId(journalId); final JNStorage storage = JournalNodeHttpServer .getJournalFromContext(context, journalId).getStorage(); // Check security if (!checkRequestorOrSendError(conf, request, response)) { return; } // Check that the namespace info is correct if (!checkStorageInfoOrSendError(storage, request, response)) { return; } long segmentTxId = ServletUtil.parseLongParam(request, SEGMENT_TXID_PARAM); FileJournalManager fjm = storage.getJournalManager(); File editFile; synchronized (fjm) { // Synchronize on the FJM so that the file doesn't get finalized // out from underneath us while we're in the process of opening // it up. EditLogFile elf = fjm.getLogFile( segmentTxId); if (elf == null) { response.sendError(HttpServletResponse.SC_NOT_FOUND, "No edit log found starting at txid " + segmentTxId); return; } editFile = elf.getFile(); ImageServlet.setVerificationHeadersForGet(response, editFile); ImageServlet.setFileNameHeaders(response, editFile); editFileIn = new FileInputStream(editFile); } DataTransferThrottler throttler = ImageServlet.getThrottler(conf); // send edits TransferFsImage.copyFileToStream(response.getOutputStream(), editFile, editFileIn, throttler); } catch (Throwable t) { String errMsg = "getedit failed. " + StringUtils.stringifyException(t); response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, errMsg); throw new IOException(errMsg); } finally { IOUtils.closeStream(editFileIn); } }
/** * Finalize the log segment at the given transaction ID. */ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId, long endTxId) throws IOException { checkFormatted(); checkRequest(reqInfo); boolean needsValidation = true; // Finalizing the log that the writer was just writing. if (startTxId == curSegmentTxId) { if (curSegment != null) { curSegment.close(); curSegment = null; curSegmentTxId = HdfsServerConstants.INVALID_TXID; } checkSync(nextTxId == endTxId + 1, "Trying to finalize in-progress log segment %s to end at " + "txid %s but only written up to txid %s", startTxId, endTxId, nextTxId - 1); // No need to validate the edit log if the client is finalizing // the log segment that it was just writing to. needsValidation = false; } FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId); if (elf == null) { throw new JournalOutOfSyncException("No log file to finalize at " + "transaction ID " + startTxId); } if (elf.isInProgress()) { if (needsValidation) { LOG.info("Validating log segment " + elf.getFile() + " about to be " + "finalized"); elf.scanLog(Long.MAX_VALUE, false); checkSync(elf.getLastTxId() == endTxId, "Trying to finalize in-progress log segment %s to end at " + "txid %s but log %s on disk only contains up to txid %s", startTxId, endTxId, elf.getFile(), elf.getLastTxId()); } fjm.finalizeLogSegment(startTxId, endTxId); } else { Preconditions.checkArgument(endTxId == elf.getLastTxId(), "Trying to re-finalize already finalized log " + elf + " with different endTxId " + endTxId); } // Once logs are finalized, a different length will never be decided. // During recovery, we treat a finalized segment the same as an accepted // recovery. Thus, we no longer need to keep track of the previously- // accepted decision. The existence of the finalized log segment is enough. purgePaxosDecision(elf.getFirstTxId()); }
/** * Finalize the log segment at the given transaction ID. */ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId, long endTxId) throws IOException { checkJournalStorageFormatted(); checkRequest(reqInfo); boolean needsValidation = true; // Finalizing the log that the writer was just writing. if (startTxId == curSegmentTxId) { if (curSegment != null) { curSegment.close(); curSegment = null; curSegmentTxId = HdfsConstants.INVALID_TXID; currentSegmentWrittenBytes = 0L; } checkSync(nextTxId == endTxId + 1, "Trying to finalize in-progress log segment %s to end at " + "txid %s but only written up to txid %s", startTxId, endTxId, nextTxId - 1); // No need to validate the edit log if the client is finalizing // the log segment that it was just writing to. needsValidation = false; } FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId); if (elf == null) { throw new JournalOutOfSyncException("No log file to finalize at " + "transaction ID " + startTxId); } if (elf.isInProgress()) { if (needsValidation) { LOG.info("Validating log segment " + elf.getFile() + " about to be " + "finalized"); elf.validateLog(); checkSync(elf.getLastTxId() == endTxId, "Trying to finalize in-progress log segment %s to end at " + "txid %s but log %s on disk only contains up to txid %s", startTxId, endTxId, elf.getFile(), elf.getLastTxId()); } fjm.finalizeLogSegment(startTxId, endTxId); } else { Preconditions.checkArgument(endTxId == elf.getLastTxId(), "Trying to re-finalize already finalized log " + elf + " with different endTxId " + endTxId); } // Once logs are finalized, a different length will never be decided. // During recovery, we treat a finalized segment the same as an accepted // recovery. Thus, we no longer need to keep track of the previously- // accepted decision. The existence of the finalized log segment is enough. purgePaxosDecision(elf.getFirstTxId()); }
public FileJournalManager getJournalManager() { return fjm; }
@Override public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { try { final ServletContext context = getServletContext(); final String journalId = request .getParameter(GetJournalEditServlet.JOURNAL_ID_PARAM); QuorumJournalManager.checkJournalId(journalId); final Journal journal = JournalNodeHttpServer.getJournalFromContext( context, journalId); final JNStorage storage = journal.getJournalStorage(); // Check that the namespace info is correct if (!GetJournalEditServlet.checkStorageInfoOrSendError(storage, request, response)) { return; } long startTxId = ServletUtil.parseLongParam(request, START_TXID_PARAM); FileJournalManager fjm = journal.getJournalManager(); LOG.info("getJournalManifest request: journalId " + journalId + ", start txid: " + startTxId + ", storage: " + storage.toColonSeparatedString()); String output = JSON.toString(new ArrayList<String>()); synchronized (journal) { // Synchronize on the journal so that the files do not change // get all log segments List<EditLogFile> logFiles = fjm.getLogFiles(startTxId, false); List<String> manifest = new ArrayList<String>(); for (EditLogFile elf : logFiles) { manifest.add(elf.toColonSeparatedString()); } output = JSON.toString(manifest); } JournalNodeHttpServer.sendResponse(output, response); } catch (Throwable t) { GetJournalEditServlet.handleFailure(t, response, "getJournalManifest"); } }
@Override public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { FileInputStream editFileIn = null; try { final ServletContext context = getServletContext(); final Configuration conf = (Configuration) getServletContext() .getAttribute(JspHelper.CURRENT_CONF); final String journalId = request.getParameter(JOURNAL_ID_PARAM); QuorumJournalManager.checkJournalId(journalId); final JNStorage storage = JournalNodeHttpServer .getJournalFromContext(context, journalId).getStorage(); // Check security if (!checkRequestorOrSendError(conf, request, response)) { return; } // Check that the namespace info is correct if (!checkStorageInfoOrSendError(storage, request, response)) { return; } long segmentTxId = ServletUtil.parseLongParam(request, SEGMENT_TXID_PARAM); FileJournalManager fjm = storage.getJournalManager(); File editFile; synchronized (fjm) { // Synchronize on the FJM so that the file doesn't get finalized // out from underneath us while we're in the process of opening // it up. EditLogFile elf = fjm.getLogFile( segmentTxId); if (elf == null) { response.sendError(HttpServletResponse.SC_NOT_FOUND, "No edit log found starting at txid " + segmentTxId); return; } editFile = elf.getFile(); GetImageServlet.setVerificationHeaders(response, editFile); GetImageServlet.setFileNameHeaders(response, editFile); editFileIn = new FileInputStream(editFile); } DataTransferThrottler throttler = GetImageServlet.getThrottler(conf); // send edits TransferFsImage.getFileServer(response, editFile, editFileIn, throttler); } catch (Throwable t) { String errMsg = "getedit failed. " + StringUtils.stringifyException(t); response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, errMsg); throw new IOException(errMsg); } finally { IOUtils.closeStream(editFileIn); } }
/** * Finalize the log segment at the given transaction ID. */ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId, long endTxId) throws IOException { checkFormatted(); checkRequest(reqInfo); boolean needsValidation = true; // Finalizing the log that the writer was just writing. if (startTxId == curSegmentTxId) { if (curSegment != null) { curSegment.close(); curSegment = null; curSegmentTxId = HdfsConstants.INVALID_TXID; } checkSync(nextTxId == endTxId + 1, "Trying to finalize in-progress log segment %s to end at " + "txid %s but only written up to txid %s", startTxId, endTxId, nextTxId - 1); // No need to validate the edit log if the client is finalizing // the log segment that it was just writing to. needsValidation = false; } FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId); if (elf == null) { throw new JournalOutOfSyncException("No log file to finalize at " + "transaction ID " + startTxId); } if (elf.isInProgress()) { if (needsValidation) { LOG.info("Validating log segment " + elf.getFile() + " about to be " + "finalized"); elf.validateLog(); checkSync(elf.getLastTxId() == endTxId, "Trying to finalize in-progress log segment %s to end at " + "txid %s but log %s on disk only contains up to txid %s", startTxId, endTxId, elf.getFile(), elf.getLastTxId()); } fjm.finalizeLogSegment(startTxId, endTxId); } else { Preconditions.checkArgument(endTxId == elf.getLastTxId(), "Trying to re-finalize already finalized log " + elf + " with different endTxId " + endTxId); } // Once logs are finalized, a different length will never be decided. // During recovery, we treat a finalized segment the same as an accepted // recovery. Thus, we no longer need to keep track of the previously- // accepted decision. The existence of the finalized log segment is enough. purgePaxosDecision(elf.getFirstTxId()); }