@Override public void selectInputStreams(Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) { Iterator<StorageDirectory> iter = storage.dirIterator(); while (iter.hasNext()) { StorageDirectory dir = iter.next(); List<EditLogFile> editFiles; try { editFiles = FileJournalManager.matchEditLogs( dir.getCurrentDir()); } catch (IOException ioe) { throw new RuntimeException(ioe); } FileJournalManager.addStreamsToCollectionFromFiles(editFiles, streams, fromTxId, inProgressOk); } }
Journal(Configuration conf, File logDir, String journalId, StartupOption startOpt, StorageErrorReporter errorReporter) throws IOException { storage = new JNStorage(conf, logDir, startOpt, errorReporter); this.journalId = journalId; refreshCachedData(); this.fjm = storage.getJournalManager(); this.metrics = JournalMetrics.create(this); EditLogFile latest = scanStorageForLatestEdits(); if (latest != null) { highestWrittenTxId = latest.getLastTxId(); } }
/** * Scan the local storage directory, and return the segment containing * the highest transaction. * @return the EditLogFile with the highest transactions, or null * if no files exist. */ private synchronized EditLogFile scanStorageForLatestEdits() throws IOException { if (!fjm.getStorageDirectory().getCurrentDir().exists()) { return null; } LOG.info("Scanning storage " + fjm); List<EditLogFile> files = fjm.getLogFiles(0); while (!files.isEmpty()) { EditLogFile latestLog = files.remove(files.size() - 1); latestLog.scanLog(); LOG.info("Latest log is " + latestLog); if (latestLog.getLastTxId() == HdfsConstants.INVALID_TXID) { // the log contains no transactions LOG.warn("Latest log " + latestLog + " has no transactions. " + "moving it aside and looking for previous log"); latestLog.moveAsideEmptyFile(); } else { return latestLog; } } LOG.info("No files in " + fjm); return null; }
/** * @return the current state of the given segment, or null if the * segment does not exist. */ @VisibleForTesting SegmentStateProto getSegmentInfo(long segmentTxId) throws IOException { EditLogFile elf = fjm.getLogFile(segmentTxId); if (elf == null) { return null; } if (elf.isInProgress()) { elf.scanLog(); } if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) { LOG.info("Edit log file " + elf + " appears to be empty. " + "Moving it aside..."); elf.moveAsideEmptyFile(); return null; } SegmentStateProto ret = SegmentStateProto.newBuilder() .setStartTxId(segmentTxId) .setEndTxId(elf.getLastTxId()) .setIsInProgress(elf.isInProgress()) .build(); LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " + TextFormat.shortDebugString(ret)); return ret; }
private void checkRecovery(MiniJournalCluster cluster, long segmentTxId, long expectedEndTxId) throws IOException { int numFinalized = 0; for (int i = 0; i < cluster.getNumNodes(); i++) { File logDir = cluster.getCurrentDir(i, JID); EditLogFile elf = FileJournalManager.getLogFile(logDir, segmentTxId); if (elf == null) { continue; } if (!elf.isInProgress()) { numFinalized++; if (elf.getLastTxId() != expectedEndTxId) { fail("File " + elf + " finalized to wrong txid, expected " + expectedEndTxId); } } } if (numFinalized < cluster.getQuorumSize()) { fail("Did not find a quorum of finalized logs starting at " + segmentTxId); } }
@Override public void selectInputStreams(Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) { Iterator<StorageDirectory> iter = storage.dirIterator(); while (iter.hasNext()) { StorageDirectory dir = iter.next(); List<EditLogFile> editFiles; try { editFiles = FileJournalManager.matchEditLogs( dir.getCurrentDir()); } catch (IOException ioe) { throw new RuntimeException(ioe); } FileJournalManager.addStreamsToCollectionFromFiles(editFiles, streams, fromTxId, Long.MAX_VALUE, inProgressOk); } }
Journal(Configuration conf, File logDir, String journalId, StartupOption startOpt, StorageErrorReporter errorReporter) throws IOException { storage = new JNStorage(conf, logDir, startOpt, errorReporter); this.journalId = journalId; refreshCachedData(); this.fjm = storage.getJournalManager(); this.metrics = JournalMetrics.create(this); EditLogFile latest = scanStorageForLatestEdits(); if (latest != null) { updateHighestWrittenTxId(latest.getLastTxId()); } }
/** * Scan the local storage directory, and return the segment containing * the highest transaction. * @return the EditLogFile with the highest transactions, or null * if no files exist. */ private synchronized EditLogFile scanStorageForLatestEdits() throws IOException { if (!fjm.getStorageDirectory().getCurrentDir().exists()) { return null; } LOG.info("Scanning storage " + fjm); List<EditLogFile> files = fjm.getLogFiles(0); while (!files.isEmpty()) { EditLogFile latestLog = files.remove(files.size() - 1); latestLog.scanLog(Long.MAX_VALUE, false); LOG.info("Latest log is " + latestLog); if (latestLog.getLastTxId() == HdfsServerConstants.INVALID_TXID) { // the log contains no transactions LOG.warn("Latest log " + latestLog + " has no transactions. " + "moving it aside and looking for previous log"); latestLog.moveAsideEmptyFile(); } else { return latestLog; } } LOG.info("No files in " + fjm); return null; }
/** * @return the current state of the given segment, or null if the * segment does not exist. */ @VisibleForTesting SegmentStateProto getSegmentInfo(long segmentTxId) throws IOException { EditLogFile elf = fjm.getLogFile(segmentTxId); if (elf == null) { return null; } if (elf.isInProgress()) { elf.scanLog(Long.MAX_VALUE, false); } if (elf.getLastTxId() == HdfsServerConstants.INVALID_TXID) { LOG.info("Edit log file " + elf + " appears to be empty. " + "Moving it aside..."); elf.moveAsideEmptyFile(); return null; } SegmentStateProto ret = SegmentStateProto.newBuilder() .setStartTxId(segmentTxId) .setEndTxId(elf.getLastTxId()) .setIsInProgress(elf.isInProgress()) .build(); LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " + TextFormat.shortDebugString(ret)); return ret; }
private void checkRecovery(MiniJournalCluster cluster, long segmentTxId, long expectedEndTxId) throws IOException { int numFinalized = 0; for (int i = 0; i < cluster.getNumNodes(); i++) { File logDir = cluster.getJournalCurrentDir(i, JID); EditLogFile elf = FileJournalManager.getLogFile(logDir, segmentTxId); if (elf == null) { continue; } if (!elf.isInProgress()) { numFinalized++; if (elf.getLastTxId() != expectedEndTxId) { fail("File " + elf + " finalized to wrong txid, expected " + expectedEndTxId); } } } if (numFinalized < cluster.getQuorumSize()) { fail("Did not find a quorum of finalized logs starting at " + segmentTxId); } }
Journal(File logDir, File imageDir, String journalId, StorageErrorReporter errorReporter, JournalNode journalNode) throws IOException { this.journalNode = journalNode; // initialize storage directories journalStorage = new JNStorage(logDir, errorReporter, false, journalNode.getConf()); imageStorage = new JNStorage(imageDir, errorReporter, true, journalNode.getConf()); // initialize journal and image managers this.fjm = new FileJournalManager(journalStorage.getSingularStorageDir(), null, errorReporter); this.imageManager = new FileImageManager( imageStorage.getStorageDirectory(), imageStorage); this.journalId = journalId; this.metrics = JournalMetrics.create(this); refreshCachedData(); EditLogFile latest = scanStorageForLatestEdits(); if (latest != null) { highestWrittenTxId = latest.getLastTxId(); metrics.setLastWrittenTxId(highestWrittenTxId); } }
/** * Scan the local storage directory, and return the segment containing * the highest transaction. * @return the EditLogFile with the highest transactions, or null * if no files exist. */ private synchronized EditLogFile scanStorageForLatestEdits() throws IOException { if (!fjm.getStorageDirectory().getCurrentDir().exists()) { return null; } LOG.info("Scanning storage " + fjm); List<EditLogFile> files = fjm.getLogFiles(0); while (!files.isEmpty()) { EditLogFile latestLog = files.remove(files.size() - 1); latestLog.validateLog(); LOG.info("Latest log is " + latestLog); if (latestLog.getLastTxId() == HdfsConstants.INVALID_TXID) { // the log contains no transactions LOG.warn("Latest log " + latestLog + " has no transactions. " + "moving it aside and looking for previous log"); latestLog.moveAsideEmptyFile(); } else { return latestLog; } } LOG.info("No files in " + fjm); return null; }
/** * @return the current state of the given segment, or null if the * segment does not exist. */ private SegmentStateProto getSegmentInfo(long segmentTxId) throws IOException { EditLogFile elf = fjm.getLogFile(segmentTxId); if (elf == null) { return null; } if (elf.isInProgress()) { elf.validateLog(); } if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) { LOG.info("Edit log file " + elf + " appears to be empty. " + "Moving it aside..."); elf.moveAsideEmptyFile(); return null; } SegmentStateProto ret = new SegmentStateProto(segmentTxId, elf.getLastTxId(), elf.isInProgress()); LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " + ret); return ret; }
/** * Get the map corresponding to the JSON string. */ public static List<EditLogFile> convertJsonToListManifest(String json) throws IOException { if (json == null || json.isEmpty()) { return new ArrayList<EditLogFile>(); } // get the list of strings from the http response TypeReference<List<String>> type = new TypeReference<List<String>>() { }; List<String> logFilesDesc = mapper.readValue(json, type); // we need to convert the list of strings into edit log files List<EditLogFile> logFiles = new ArrayList<EditLogFile>(); for (String lf : logFilesDesc) { logFiles.add(new EditLogFile(lf)); } return logFiles; }
@Override public void selectInputStreams(Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk, boolean forReading) { Iterator<StorageDirectory> iter = storage.dirIterator(); while (iter.hasNext()) { StorageDirectory dir = iter.next(); List<EditLogFile> editFiles; try { editFiles = FileJournalManager.matchEditLogs( dir.getCurrentDir()); } catch (IOException ioe) { throw new RuntimeException(ioe); } FileJournalManager.addStreamsToCollectionFromFiles(editFiles, streams, fromTxId, inProgressOk); } }
Journal(Configuration conf, File logDir, String journalId, StorageErrorReporter errorReporter) throws IOException { storage = new JNStorage(conf, logDir, errorReporter); this.journalId = journalId; refreshCachedData(); this.fjm = storage.getJournalManager(); this.metrics = JournalMetrics.create(this); EditLogFile latest = scanStorageForLatestEdits(); if (latest != null) { highestWrittenTxId = latest.getLastTxId(); } }
/** * @return the current state of the given segment, or null if the * segment does not exist. */ private SegmentStateProto getSegmentInfo(long segmentTxId) throws IOException { EditLogFile elf = fjm.getLogFile(segmentTxId); if (elf == null) { return null; } if (elf.isInProgress()) { elf.validateLog(); } if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) { LOG.info("Edit log file " + elf + " appears to be empty. " + "Moving it aside..."); elf.moveAsideEmptyFile(); return null; } SegmentStateProto ret = SegmentStateProto.newBuilder() .setStartTxId(segmentTxId) .setEndTxId(elf.getLastTxId()) .setIsInProgress(elf.isInProgress()) .build(); LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " + TextFormat.shortDebugString(ret)); return ret; }