@Before public void setup() throws Exception { File editsDir = new File(MiniDFSCluster.getBaseDirectory() + File.separator + "TestJournalNode"); FileUtil.fullyDelete(editsDir); conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, editsDir.getAbsolutePath()); conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); jn = new JournalNode(); jn.setConf(conf); jn.start(); journalId = "test-journalid-" + GenericTestUtils.uniqueSequenceId(); journal = jn.getOrCreateJournal(journalId); journal.format(FAKE_NSINFO); ch = new IPCLoggerChannel(conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); }
@Before public void setup() throws Exception { File editsDir = new File(MiniDFSCluster.getBaseDirectory(null) + File.separator + "TestJournalNode"); FileUtil.fullyDelete(editsDir); conf.set(JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY, editsDir.getAbsolutePath()); conf.set(JournalConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); MiniJournalCluster.getFreeHttpPortAndUpdateConf(conf, true); jn = new JournalNode(); jn.setConf(conf); jn.start(); journalId = "test-journalid-" + QJMTestUtil.uniqueSequenceId(); journal = jn.getOrCreateJournal(QuorumJournalManager .journalIdStringToBytes(journalId)); journal.transitionJournal(FAKE_NSINFO, Transition.FORMAT, null); ch = new IPCLoggerChannel(conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); }
@Before public void setupMock() { conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, LIMIT_QUEUE_SIZE_MB); // Channel to the mock object instead of a real IPC proxy. ch = new IPCLoggerChannel(conf, FAKE_NSINFO, JID, FAKE_ADDR) { @Override protected QJournalProtocol getProxy() throws IOException { return mockProxy; } }; ch.setEpoch(1); }
@Test(timeout=100000) public void testJournal() throws Exception { MetricsRecordBuilder metrics = MetricsAsserts.getMetrics( journal.getMetricsForTests().getName()); MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics); IPCLoggerChannel ch = new IPCLoggerChannel( conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get(); ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get(); metrics = MetricsAsserts.getMetrics( journal.getMetricsForTests().getName()); MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics); ch.setCommittedTxId(100L); ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get(); metrics = MetricsAsserts.getMetrics( journal.getMetricsForTests().getName()); MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics); }
@Test(timeout=100000) public void testJournal() throws Exception { MetricsRecordBuilder metrics = MetricsAsserts.getMetrics( journal.getMetricsForTests().getName()); MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics); MetricsAsserts.assertGauge("LastJournalTimestamp", 0L, metrics); long beginTimestamp = System.currentTimeMillis(); IPCLoggerChannel ch = new IPCLoggerChannel( conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get(); ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get(); metrics = MetricsAsserts.getMetrics( journal.getMetricsForTests().getName()); MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics); long lastJournalTimestamp = MetricsAsserts.getLongGauge( "LastJournalTimestamp", metrics); assertTrue(lastJournalTimestamp > beginTimestamp); beginTimestamp = lastJournalTimestamp; ch.setCommittedTxId(100L); ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get(); metrics = MetricsAsserts.getMetrics( journal.getMetricsForTests().getName()); MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics); lastJournalTimestamp = MetricsAsserts.getLongGauge( "LastJournalTimestamp", metrics); assertTrue(lastJournalTimestamp > beginTimestamp); }
@Before public void setupMock() { conf.setInt(JournalConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, LIMIT_QUEUE_SIZE_MB); // Channel to the mock object instead of a real IPC proxy. ch = new IPCLoggerChannel(conf, FAKE_NSINFO, JID, FAKE_ADDR) { @Override protected QJournalProtocol getProxy() throws IOException { return mockProxy; } }; ch.setEpoch(1); }
@Before public void setup() throws Exception { File editsDir = new File(MiniDFSCluster.getBaseDirectory(null) + File.separator + "TestJournalNode"); FileUtil.fullyDelete(editsDir); conf.set(JournalConfigKeys.DFS_JOURNALNODE_DIR_KEY, editsDir.getAbsolutePath()); conf.set(JournalConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); int port = MiniJournalCluster.getFreeHttpPortAndUpdateConf(conf, true); httpAddress = "http://localhost:" + port; jn = new JournalNode(); jn.setConf(conf); jn.start(); journalId = "test-journalid-" + QJMTestUtil.uniqueSequenceId(); journal = jn.getOrCreateJournal(QuorumJournalManager .journalIdStringToBytes(journalId)); journal.transitionJournal(FAKE_NSINFO, Transition.FORMAT, null); journal.transitionImage(FAKE_NSINFO, Transition.FORMAT, null); ch = new IPCLoggerChannel(conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); // this will setup the http port ch.getJournalState(); }
@Test public void testJournal() throws Exception { //MetricsRecordBuilder metrics = MetricsAsserts.getMetrics( // journal.getMetricsForTests().getName()); //MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics); //MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics); //MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics); IPCLoggerChannel ch = new IPCLoggerChannel( conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1).get(); ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get(); //metrics = MetricsAsserts.getMetrics( // journal.getMetricsForTests().getName()); //MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics); //MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics); //MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics); ch.setCommittedTxId(100L, false); ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get(); //metrics = MetricsAsserts.getMetrics( // journal.getMetricsForTests().getName()); //MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics); //MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics); //MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics); }
@Test(timeout=100000) public void testJournal() throws Exception { MetricsRecordBuilder metrics = MetricsAsserts.getMetrics( journal.getMetricsForTests().getName()); MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics); IPCLoggerChannel ch = new IPCLoggerChannel( conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1).get(); ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get(); metrics = MetricsAsserts.getMetrics( journal.getMetricsForTests().getName()); MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics); ch.setCommittedTxId(100L); ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get(); metrics = MetricsAsserts.getMetrics( journal.getMetricsForTests().getName()); MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics); }
@Test(timeout=100000) public void testHttpServer() throws Exception { String urlRoot = jn.getHttpServerURI(); // Check default servlets. String pageContents = DFSTestUtil.urlGet(new URL(urlRoot + "/jmx")); assertTrue("Bad contents: " + pageContents, pageContents.contains( "Hadoop:service=JournalNode,name=JvmMetrics")); // Create some edits on server side byte[] EDITS_DATA = QJMTestUtil.createTxnData(1, 3); IPCLoggerChannel ch = new IPCLoggerChannel( conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get(); ch.sendEdits(1L, 1, 3, EDITS_DATA).get(); ch.finalizeLogSegment(1, 3).get(); // Attempt to retrieve via HTTP, ensure we get the data back // including the header we expected byte[] retrievedViaHttp = DFSTestUtil.urlGetBytes(new URL(urlRoot + "/getJournal?segmentTxId=1&jid=" + journalId)); byte[] expected = Bytes.concat( Ints.toByteArray(HdfsConstants.NAMENODE_LAYOUT_VERSION), (new byte[] { 0, 0, 0, 0 }), // layout flags section EDITS_DATA); assertArrayEquals(expected, retrievedViaHttp); // Attempt to fetch a non-existent file, check that we get an // error status code URL badUrl = new URL(urlRoot + "/getJournal?segmentTxId=12345&jid=" + journalId); HttpURLConnection connection = (HttpURLConnection)badUrl.openConnection(); try { assertEquals(404, connection.getResponseCode()); } finally { connection.disconnect(); } }
@Test(timeout=100000) public void testHttpServer() throws Exception { String urlRoot = jn.getHttpServerURI(); // Check default servlets. String pageContents = DFSTestUtil.urlGet(new URL(urlRoot + "/jmx")); assertTrue("Bad contents: " + pageContents, pageContents.contains( "Hadoop:service=JournalNode,name=JvmMetrics")); // Create some edits on server side byte[] EDITS_DATA = QJMTestUtil.createTxnData(1, 3); IPCLoggerChannel ch = new IPCLoggerChannel( conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get(); ch.sendEdits(1L, 1, 3, EDITS_DATA).get(); ch.finalizeLogSegment(1, 3).get(); // Attempt to retrieve via HTTP, ensure we get the data back // including the header we expected byte[] retrievedViaHttp = DFSTestUtil.urlGetBytes(new URL(urlRoot + "/getJournal?segmentTxId=1&jid=" + journalId)); byte[] expected = Bytes.concat( Ints.toByteArray(HdfsServerConstants.NAMENODE_LAYOUT_VERSION), (new byte[] { 0, 0, 0, 0 }), // layout flags section EDITS_DATA); assertArrayEquals(expected, retrievedViaHttp); // Attempt to fetch a non-existent file, check that we get an // error status code URL badUrl = new URL(urlRoot + "/getJournal?segmentTxId=12345&jid=" + journalId); HttpURLConnection connection = (HttpURLConnection)badUrl.openConnection(); try { assertEquals(404, connection.getResponseCode()); } finally { connection.disconnect(); } }
@Test(timeout=100000) public void testHttpServer() throws Exception { String urlRoot = jn.getHttpServerURI(); // Check default servlets. String pageContents = DFSTestUtil.urlGet(new URL(urlRoot + "/jmx")); assertTrue("Bad contents: " + pageContents, pageContents.contains( "Hadoop:service=JournalNode,name=JvmMetrics")); // Check JSP page. pageContents = DFSTestUtil.urlGet( new URL(urlRoot + "/journalstatus.jsp")); assertTrue(pageContents.contains("JournalNode")); // Create some edits on server side byte[] EDITS_DATA = QJMTestUtil.createTxnData(1, 3); IPCLoggerChannel ch = new IPCLoggerChannel( conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get(); ch.sendEdits(1L, 1, 3, EDITS_DATA).get(); ch.finalizeLogSegment(1, 3).get(); // Attempt to retrieve via HTTP, ensure we get the data back // including the header we expected byte[] retrievedViaHttp = DFSTestUtil.urlGetBytes(new URL(urlRoot + "/getJournal?segmentTxId=1&jid=" + journalId)); byte[] expected = Bytes.concat( Ints.toByteArray(HdfsConstants.NAMENODE_LAYOUT_VERSION), (new byte[] { 0, 0, 0, 0 }), // layout flags section EDITS_DATA); assertArrayEquals(expected, retrievedViaHttp); // Attempt to fetch a non-existent file, check that we get an // error status code URL badUrl = new URL(urlRoot + "/getJournal?segmentTxId=12345&jid=" + journalId); HttpURLConnection connection = (HttpURLConnection)badUrl.openConnection(); try { assertEquals(404, connection.getResponseCode()); } finally { connection.disconnect(); } }
@Test public void testHttpServer() throws Exception { InetSocketAddress addr = jn.getBoundHttpAddress(); assertTrue(addr.getPort() > 0); String urlRoot = "http://localhost:" + addr.getPort(); // TODO other servlets // Create some edits on server side int numTxns = 10; byte[] EDITS_DATA = QJMTestUtil.createTxnData(1, numTxns); IPCLoggerChannel ch = new IPCLoggerChannel( conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1).get(); ch.sendEdits(1L, 1, numTxns, EDITS_DATA).get(); ch.finalizeLogSegment(1, numTxns).get(); // Attempt to retrieve via HTTP, ensure we get the data back // including the header we expected byte[] retrievedViaHttp = QJMTestUtil.urlGetBytes(new URL(urlRoot + "/getJournal?segmentTxId=1&position=0&jid=" + journalId)); byte[] expected = Bytes.concat( Ints.toByteArray(FSConstants.LAYOUT_VERSION), EDITS_DATA); // retrieve partial edits int pos = 100; byte[] expectedPart = new byte[expected.length - pos]; System.arraycopy(expected, pos, expectedPart, 0, expectedPart.length); retrievedViaHttp = QJMTestUtil.urlGetBytes(new URL(urlRoot + "/getJournal?segmentTxId=1&position=" + pos + "&jid=" + journalId)); assertArrayEquals(expectedPart, retrievedViaHttp); // Attempt to fetch a non-existent file, check that we get an // error status code URL badUrl = new URL(urlRoot + "/getJournal?segmentTxId=12345&position=0&jid=" + journalId); HttpURLConnection connection = (HttpURLConnection)badUrl.openConnection(); try { assertEquals(404, connection.getResponseCode()); } finally { connection.disconnect(); } }
@Test(timeout=100000) public void testHttpServer() throws Exception { InetSocketAddress addr = jn.getBoundHttpAddress(); assertTrue(addr.getPort() > 0); String urlRoot = "http://localhost:" + addr.getPort(); // Check default servlets. String pageContents = DFSTestUtil.urlGet(new URL(urlRoot + "/jmx")); assertTrue("Bad contents: " + pageContents, pageContents.contains( "Hadoop:service=JournalNode,name=JvmMetrics")); // Check JSP page. pageContents = DFSTestUtil.urlGet( new URL(urlRoot + "/journalstatus.jsp")); assertTrue(pageContents.contains("JournalNode")); // Create some edits on server side byte[] EDITS_DATA = QJMTestUtil.createTxnData(1, 3); IPCLoggerChannel ch = new IPCLoggerChannel( conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress()); ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1).get(); ch.sendEdits(1L, 1, 3, EDITS_DATA).get(); ch.finalizeLogSegment(1, 3).get(); // Attempt to retrieve via HTTP, ensure we get the data back // including the header we expected byte[] retrievedViaHttp = DFSTestUtil.urlGetBytes(new URL(urlRoot + "/getJournal?segmentTxId=1&jid=" + journalId)); byte[] expected = Bytes.concat( Ints.toByteArray(HdfsConstants.LAYOUT_VERSION), EDITS_DATA); assertArrayEquals(expected, retrievedViaHttp); // Attempt to fetch a non-existent file, check that we get an // error status code URL badUrl = new URL(urlRoot + "/getJournal?segmentTxId=12345&jid=" + journalId); HttpURLConnection connection = (HttpURLConnection)badUrl.openConnection(); try { assertEquals(404, connection.getResponseCode()); } finally { connection.disconnect(); } }