public static EditLogOutputStream writeSegment(MiniJournalCluster cluster, QuorumJournalManager qjm, long startTxId, int numTxns, boolean finalize) throws IOException { EditLogOutputStream stm = qjm.startLogSegment(startTxId, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); // Should create in-progress assertExistsInQuorum(cluster, NNStorage.getInProgressEditsFileName(startTxId)); writeTxns(stm, startTxId, numTxns); if (finalize) { stm.close(); qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1); return null; } else { return stm; } }
public static long recoverAndReturnLastTxn(QuorumJournalManager qjm) throws IOException { qjm.recoverUnfinalizedSegments(); long lastRecoveredTxn = 0; List<EditLogInputStream> streams = Lists.newArrayList(); try { qjm.selectInputStreams(streams, 0, false); for (EditLogInputStream elis : streams) { assertTrue(elis.getFirstTxId() > lastRecoveredTxn); lastRecoveredTxn = elis.getLastTxId(); } } finally { IOUtils.cleanup(null, streams.toArray(new Closeable[0])); } return lastRecoveredTxn; }
@Before public void setup() throws Exception { File editsDir = new File(MiniDFSCluster.getBaseDirectory(null) + File.separator + "TestJournalNode"); FileUtil.fullyDelete(editsDir); conf.set(JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY, editsDir.getAbsolutePath()); conf.set(JournalConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); int port = MiniJournalCluster.getFreeHttpPortAndUpdateConf(conf, true); httpAddress = "http://localhost:" + port; jn = new JournalNode(); jn.setConf(conf); jn.start(); journalId = "test-journalid-" + QJMTestUtil.uniqueSequenceId(); journal = jn.getOrCreateJournal(QuorumJournalManager .journalIdStringToBytes(journalId)); journal.transitionJournal(FAKE_NSINFO, Transition.FORMAT, null); journal.transitionImage(FAKE_NSINFO, Transition.FORMAT, null); }
@Before public void setup() throws Exception { File editsDir = new File(MiniDFSCluster.getBaseDirectory(null) + File.separator + "TestJournalNode"); FileUtil.fullyDelete(editsDir); conf.set(JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY, editsDir.getAbsolutePath()); conf.set(JournalConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); MiniJournalCluster.getFreeHttpPortAndUpdateConf(conf, true); jn = new JournalNode(); jn.setConf(conf); jn.start(); journalId = "test-journalid-" + QJMTestUtil.uniqueSequenceId(); journal = jn.getOrCreateJournal(QuorumJournalManager .journalIdStringToBytes(journalId)); journal.transitionJournal(FAKE_NSINFO, Transition.FORMAT, null); ch = new IPCLoggerChannel(conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); }
public static EditLogOutputStream writeSegment(MiniJournalCluster cluster, QuorumJournalManager qjm, long startTxId, int numTxns, boolean finalize) throws IOException { EditLogOutputStream stm = qjm.startLogSegment(startTxId); // Should create in-progress assertExistsInQuorum(cluster, NNStorage.getInProgressEditsFileName(startTxId)); writeTxns(stm, startTxId, numTxns); if (finalize) { stm.close(); qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1); return null; } else { return stm; } }
public static long recoverAndReturnLastTxn(QuorumJournalManager qjm) throws IOException { qjm.recoverUnfinalizedSegments(); long lastRecoveredTxn = 0; List<EditLogInputStream> streams = Lists.newArrayList(); try { qjm.selectInputStreams(streams, 0, false, true); for (EditLogInputStream elis : streams) { assertTrue(elis.getFirstTxId() > lastRecoveredTxn); lastRecoveredTxn = elis.getLastTxId(); } } finally { IOUtils.cleanup(null, streams.toArray(new Closeable[0])); } return lastRecoveredTxn; }
synchronized Journal getOrCreateJournal(String jid, StartupOption startOpt) throws IOException { QuorumJournalManager.checkJournalId(jid); Journal journal = journalsById.get(jid); if (journal == null) { File logDir = getLogDir(jid); LOG.info("Initializing journal in directory " + logDir); journal = new Journal(conf, logDir, jid, startOpt, new ErrorReporter()); journalsById.put(jid, journal); } return journal; }
@Before public void setup() throws Exception { spyLoggers = ImmutableList.of( mockLogger(), mockLogger(), mockLogger()); qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO) { @Override protected List<AsyncLogger> createLoggers(AsyncLogger.Factory factory) { return spyLoggers; } }; for (AsyncLogger logger : spyLoggers) { futureReturns(GetJournalStateResponseProto.newBuilder() .setLastPromisedEpoch(0) .setHttpPort(-1) .build()) .when(logger).getJournalState(); futureReturns( NewEpochResponseProto.newBuilder().build() ).when(logger).newEpoch(Mockito.anyLong()); futureReturns(null).when(logger).format(Mockito.<NamespaceInfo>any()); } qjm.recoverUnfinalizedSegments(); }
@Before public void setup() throws Exception { spyLoggers = ImmutableList.of( mockLogger(), mockLogger(), mockLogger()); qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO, null, false) { @Override protected List<AsyncLogger> createLoggers(AsyncLogger.Factory factory) { return spyLoggers; } }; for (AsyncLogger logger : spyLoggers) { GetJournalStateResponseProto p = new GetJournalStateResponseProto(); p.setLastPromisedEpoch(0); p.setHttpPort(-1); futureReturns(p) .when(logger).getJournalState(); futureReturns( new NewEpochResponseProto() ).when(logger).newEpoch(Mockito.anyLong()); StartupOption startOpt = null; futureReturns(null).when(logger).transitionJournal( Mockito.<NamespaceInfo> any(), Mockito.eq(Transition.FORMAT), Mockito.eq(startOpt)); } qjm.recoverUnfinalizedSegments(); }
@Before public void setup() throws Exception { File editsDir = new File(MiniDFSCluster.getBaseDirectory(null) + File.separator + "TestJournalNode"); FileUtil.fullyDelete(editsDir); conf.set(JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY, editsDir.getAbsolutePath()); conf.set(JournalConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); int port = MiniJournalCluster.getFreeHttpPortAndUpdateConf(conf, true); httpAddress = "http://localhost:" + port; jn = new JournalNode(); jn.setConf(conf); jn.start(); journalId = "test-journalid-" + QJMTestUtil.uniqueSequenceId(); journal = jn.getOrCreateJournal(QuorumJournalManager .journalIdStringToBytes(journalId)); journal.transitionJournal(FAKE_NSINFO, Transition.FORMAT, null); journal.transitionImage(FAKE_NSINFO, Transition.FORMAT, null); ch = new IPCLoggerChannel(conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); // this will setup the http port ch.getJournalState(); }
public static EditLogOutputStream writeRandomSegment(MiniJournalCluster cluster, QuorumJournalManager qjm, long startTxId, int numTxns, boolean finalize, List<FSEditLogOp> writtenTxns) throws IOException { EditLogOutputStream stm = qjm.startLogSegment(startTxId); // Should create in-progress assertExistsInQuorum(cluster, NNStorage.getInProgressEditsFileName(startTxId)); List<FSEditLogOp> txns = FSEditLogTestUtil.getContiguousLogSegment( (int) startTxId, ((int) startTxId + numTxns - 1)); for (FSEditLogOp op : txns) { stm.write(op); stm.setReadyToFlush(); stm.flush(); if (writtenTxns != null) { writtenTxns.add(op); } } if (finalize) { stm.close(); qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1); return null; } else { return stm; } }
private JournalManager constructJournalManager(URI editsUri) throws IOException { if (editsUri.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) { StorageDirectory sd = new NNStorage(new StorageInfo()).new StorageDirectory( new File(editsUri.getPath())); return new FileJournalManagerReadOnly(sd); } else if (editsUri.getScheme().equals(QuorumJournalManager.QJM_URI_SCHEME)) { return new QuorumJournalManager(conf, editsUri, new NamespaceInfo(new StorageInfo()), null, false); } else { throwIOException("Other journals not supported yet.", null); } return null; }
/** * Shared image needs to be in file storage, or QJM providing that QJM also * stores edits. */ private static String checkImageStorage(URI sharedImage, URI sharedEdits) { if (sharedImage.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) { // shared image is stored in file storage return ""; } else if (sharedImage.getScheme().equals( QuorumJournalManager.QJM_URI_SCHEME) && sharedImage.equals(sharedEdits)) { // image is stored in qjm together with edits return ""; } return "Shared image uri: " + sharedImage + " must be either file storage" + " or be equal to shared edits storage " + sharedEdits + ". "; }
protected void updateNamespaceInfo(StorageInfo si) { for (JournalAndStream jas : journals) { JournalManager jm = jas.getManager(); if (jm instanceof QuorumJournalManager) { ((QuorumJournalManager)jm).updateNamespaceInfo(si); } } }
public synchronized Journal getOrCreateJournal(byte[] jid) throws IOException { Journal journal = journalsById.get(new ByteArray(jid)); if (journal == null) { String journalId = QuorumJournalManager.journalIdBytesToString(jid); File logDir = getJournalDir(journalId); File imgDir = getImageDir(journalId); LOG.info("Initializing journal in directory " + logDir); journal = new Journal(logDir, imgDir, journalId, new ErrorReporter(), this); journalsById.put(new ByteArray(jid), journal); } return journal; }
synchronized Journal getOrCreateJournal(String jid) throws IOException { QuorumJournalManager.checkJournalId(jid); Journal journal = journalsById.get(jid); if (journal == null) { File logDir = getLogDir(jid); LOG.info("Initializing journal in directory " + logDir); journal = new Journal(conf, logDir, jid, new ErrorReporter()); journalsById.put(jid, journal); } return journal; }