@Test public void testStartStop() throws IOException { Configuration conf = new Configuration(); MiniJournalCluster c = new MiniJournalCluster.Builder(conf) .build(); try { URI uri = c.getQuorumJournalURI("myjournal"); String[] addrs = uri.getAuthority().split(";"); assertEquals(3, addrs.length); JournalNode node = c.getJournalNode(0); String dir = node.getConf().get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY); assertEquals( new File(MiniDFSCluster.getBaseDirectory() + "journalnode-0") .getAbsolutePath(), dir); } finally { c.shutdown(); } }
private MiniJournalCluster(Builder b) throws IOException { LOG.info("Starting MiniJournalCluster with " + b.numJournalNodes + " journal nodes"); if (b.baseDir != null) { this.baseDir = new File(b.baseDir); } else { this.baseDir = new File(MiniDFSCluster.getBaseDirectory()); } nodes = new JNInfo[b.numJournalNodes]; for (int i = 0; i < b.numJournalNodes; i++) { if (b.format) { File dir = getStorageDir(i); LOG.debug("Fully deleting JN directory " + dir); FileUtil.fullyDelete(dir); } JournalNode jn = new JournalNode(); jn.setConf(createConfForNode(b, i)); jn.start(); nodes[i] = new JNInfo(jn); } }
public void restartJournalNode(int i) throws InterruptedException, IOException { JNInfo info = nodes[i]; JournalNode jn = info.node; Configuration conf = new Configuration(jn.getConf()); if (jn.isStarted()) { jn.stopAndJoin(0); } conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, NetUtils.getHostPortString(info.ipcAddr)); final String uri = info.httpServerURI; if (uri.startsWith("http://")) { conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, uri.substring(("http://".length()))); } else if (info.httpServerURI.startsWith("https://")) { conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTPS_ADDRESS_KEY, uri.substring(("https://".length()))); } JournalNode newJN = new JournalNode(); newJN.setConf(conf); newJN.start(); info.node = newJN; }
@Test public void testStartStop() throws IOException { Configuration conf = new Configuration(); MiniJournalCluster c = new MiniJournalCluster.Builder(conf) .build(); c.waitActive(); try { URI uri = c.getQuorumJournalURI("myjournal"); String[] addrs = uri.getAuthority().split(";"); assertEquals(3, addrs.length); JournalNode node = c.getJournalNode(0); String dir = node.getConf().get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY); assertEquals( new File(MiniDFSCluster.getBaseDirectory() + "journalnode-0") .getAbsolutePath(), dir); } finally { c.shutdown(); } }
@Before public void setup() throws Exception { File editsDir = new File(MiniDFSCluster.getBaseDirectory() + File.separator + "TestJournalNode"); FileUtil.fullyDelete(editsDir); conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, editsDir.getAbsolutePath()); conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); jn = new JournalNode(); jn.setConf(conf); jn.start(); journalId = "test-journalid-" + GenericTestUtils.uniqueSequenceId(); journal = jn.getOrCreateJournal(journalId); journal.format(FAKE_NSINFO); ch = new IPCLoggerChannel(conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); }
@Test public void testStartStop() throws IOException { Configuration conf = new Configuration(); MiniJournalCluster c = new MiniJournalCluster.Builder(conf) .build(); try { URI uri = c.getQuorumJournalURI("myjournal"); String[] addrs = uri.getAuthority().split(";"); assertEquals(3, addrs.length); JournalNode node = c.getJournalNode(0); String dir = node.getConf().get(JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY); assertEquals( new File(MiniDFSCluster.getBaseDirectory(conf) , "journalnode-0") .getAbsolutePath(), dir); } finally { c.shutdown(); } }
/** * Rolls the image and asserts contents of the manifests. */ private void assertManifest(int iteration, MD5Hash digest, boolean skipPartial) throws IOException { if (!skipPartial) { for (int i = 0; i < cluster.getNumNodes(); i++) { JournalNode jn = cluster.getJournalNodes()[i]; RemoteImageManifest rim = jn.getJournal(JID.getBytes()) .getImageManifest(-1); assertEquals(iteration + 1, rim.getImages().size()); for (int j = 0; j <= iteration; j++) { assertEquals(startTxId + j, rim.getImages().get(j).getTxId()); } } } // get manifest through qjm RemoteImageManifest rm = qjm.getImageManifest(-1); for (int j = 0; j <= iteration; j++) { assertEquals(startTxId + j, rm.getImages().get(j).getTxId()); } assertEquals(startTxId + iteration, qjm.getLatestImage() .getCheckpointTxId()); }
private void simulateFailute(InjectionEventI event, Object... args) throws IOException { // get the journal node ServletContext context = (ServletContext) args[0]; JournalNode jn = (JournalNode) context .getAttribute(JournalNodeHttpServer.JN_ATTRIBUTE_KEY); // configuration stores the index of the node Configuration conf = jn.getConf(); // check which node this is int jid = conf.getInt(MiniJournalCluster.DFS_JOURNALNODE_TEST_ID, 0); // fail if we are supposed to fail on this event if (event == failOn[jid]) { exceptionsThrown.incrementAndGet(); throw new IOException("Testing failures"); } }
/** * Get the journal node we are tailing from, and indicate which stream this is. */ private JournalNode getTailingJN(EditLogInputStream str, URLLogInputStream[] tailingStream) throws Exception { RedundantEditLogInputStream is = (RedundantEditLogInputStream) str; Field curIdxF = RedundantEditLogInputStream.class .getDeclaredField("curIdx"); curIdxF.setAccessible(true); int curIdx = curIdxF.getInt(is); URLLogInputStream[] streams = getStreams(is); JournalNode jn = null; for (JournalNode j : cluster.getJournalNodes()) { if (streams[curIdx].getName().contains( Integer.toString(j.getBoundHttpAddress().getPort()))) { jn = j; break; } } tailingStream[0] = streams[curIdx]; return jn; }
@Before public void setup() throws Exception { File editsDir = new File(MiniDFSCluster.getBaseDirectory(null) + File.separator + "TestJournalNode"); FileUtil.fullyDelete(editsDir); conf.set(JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY, editsDir.getAbsolutePath()); conf.set(JournalConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); int port = MiniJournalCluster.getFreeHttpPortAndUpdateConf(conf, true); httpAddress = "http://localhost:" + port; jn = new JournalNode(); jn.setConf(conf); jn.start(); journalId = "test-journalid-" + QJMTestUtil.uniqueSequenceId(); journal = jn.getOrCreateJournal(QuorumJournalManager .journalIdStringToBytes(journalId)); journal.transitionJournal(FAKE_NSINFO, Transition.FORMAT, null); journal.transitionImage(FAKE_NSINFO, Transition.FORMAT, null); }
@Before public void setup() throws Exception { File editsDir = new File(MiniDFSCluster.getBaseDirectory(null) + File.separator + "TestJournalNode"); FileUtil.fullyDelete(editsDir); conf.set(JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY, editsDir.getAbsolutePath()); conf.set(JournalConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); MiniJournalCluster.getFreeHttpPortAndUpdateConf(conf, true); jn = new JournalNode(); jn.setConf(conf); jn.start(); journalId = "test-journalid-" + QJMTestUtil.uniqueSequenceId(); journal = jn.getOrCreateJournal(QuorumJournalManager .journalIdStringToBytes(journalId)); journal.transitionJournal(FAKE_NSINFO, Transition.FORMAT, null); ch = new IPCLoggerChannel(conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); }
private static void assertJNFailsToStart(Configuration conf, String errString) { try { JournalNode jn = new JournalNode(); jn.setConf(conf); jn.start(); } catch (Exception e) { GenericTestUtils.assertExceptionContains(errString, e); } }
/** * Shutdown all of the JournalNodes in the cluster. * @throws IOException if one or more nodes failed to stop */ public void shutdown() throws IOException { boolean failed = false; for (JournalNode jn : nodes) { try { jn.stopAndJoin(0); } catch (Exception e) { failed = true; LOG.warn("Unable to stop journal node " + jn, e); } } if (failed) { throw new IOException("Unable to shut down. Check log for details"); } }
public void restartJournalNode(int i) throws InterruptedException, IOException { Configuration conf = new Configuration(nodes[i].getConf()); if (nodes[i].isStarted()) { nodes[i].stopAndJoin(0); } conf.set(JournalConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "127.0.0.1:" + ipcAddrs[i].getPort()); conf.set(JournalConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "127.0.0.1:" + httpAddrs[i].getPort()); JournalNode jn = new JournalNode(); jn.setConf(conf); jn.start(); }
/** * Return the directory inside configured storage * dir which corresponds to a given journal. * Edits storage. */ public static File getJournalDir(JournalNode jn, String jid) { String dir = jn.getConf().get(JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY, JournalConfigKeys.DFS_JOURNALNODE_DIR_DEFAULT); Preconditions.checkArgument(jid != null && !jid.isEmpty(), "bad journal identifier: %s", jid); return new File(new File(new File(dir), "edits"), jid); }
/** * Return the directory inside our configured storage * dir which corresponds to a given journal. * Image storage. */ public static File getImageDir(JournalNode jn, String jid) { String dir = jn.getConf().get(JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY, JournalConfigKeys.DFS_JOURNALNODE_DIR_DEFAULT); Preconditions.checkArgument(jid != null && !jid.isEmpty(), "bad journal identifier: %s", jid); return new File(new File(new File(dir), "image"), jid); }
private JournalNode getStandbyTailingJN() { assertTrue(handler.currentJournalHttpPort != -1); // Find the journal node the Standby is tailing from. JournalNode jn = null; for (JournalNode j : journalCluster.getJournalNodes()) { if (j.getBoundHttpAddress().getPort() == handler.currentJournalHttpPort) { jn = j; break; } } handler.currentJournalRPCPort = jn.getBoundIpcAddress().getPort(); return jn; }