@Test public void testSimpleWrite() throws Exception { NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi); bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); bkjm.finalizeLogSegment(1, 100); String zkpath = bkjm.finalizedLedgerZNode(1, 100); assertNotNull(zkc.exists(zkpath, false)); assertNull(zkc.exists(bkjm.inprogressZNode(1), false)); }
@Test public void testNumberOfTransactions() throws Exception { NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi); bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); bkjm.finalizeLogSegment(1, 100); long numTrans = bkjm.getNumberOfTransactions(1, true); assertEquals(100, numTrans); }
private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm, int startTxid, int endTxid) throws IOException, KeeperException, InterruptedException { EditLogOutputStream out = bkjm.startLogSegment(startTxid, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); for (long i = startTxid; i <= endTxid; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); // finalize the inprogress_1 log segment. bkjm.finalizeLogSegment(startTxid, endTxid); String zkpath1 = bkjm.finalizedLedgerZNode(startTxid, endTxid); assertNotNull(zkc.exists(zkpath1, false)); assertNull(zkc.exists(bkjm.inprogressZNode(startTxid), false)); return zkpath1; }
@Test public void testGetInputStreamNoValidationNoCheckLastTxId() throws Exception { setupTest("test-get-input-stream-no-validation-no-check-last-txid"); File tempEditsFile = FSEditLogTestUtil.createTempEditsFile( "test-get-input-stream-with-validation"); try { EditLogOutputStream bkeos = bkjm.startLogSegment(1); EditLogOutputStream elfos = new EditLogFileOutputStream(tempEditsFile, null); elfos.create(); FSEditLogTestUtil.populateStreams(1, 100, bkeos, elfos); EditLogInputStream bkeis = getJournalInputStreamDontCheckLastTxId(bkjm, 1); EditLogInputStream elfis = new EditLogFileInputStream(tempEditsFile); Map<String, EditLogInputStream> streamByName = ImmutableMap.of("BookKeeper", bkeis, "File", elfis); FSEditLogTestUtil.assertStreamsAreEquivalent(100, streamByName); } finally { if (!tempEditsFile.delete()) { LOG.warn("Unable to delete edits file: " + tempEditsFile.getAbsolutePath()); } } }
@Test public void testRecoverUnfinalizedSegments() throws Exception { setupTest("test-recover-unfinalized-segments"); TestBKJMInjectionHandler h = new TestBKJMInjectionHandler(); InjectionHandler.set(h); EditLogOutputStream eos = bkjm.startLogSegment(1); FSEditLogTestUtil.populateStreams(1, 100, eos); bkjm.currentInProgressPath = null; bkjm.recoverUnfinalizedSegments(); Collection<EditLogLedgerMetadata> ledgers = bkjm.metadataManager.listLedgers(true); assertEquals("Only one ledger must remain", 1, ledgers.size()); EditLogLedgerMetadata ledger = ledgers.iterator().next(); assertEquals("Ledger must be finalized with correct lastTxId", ledger.getLastTxId(), 100); assertEquals("Finalized ledger's ledgerId must match created ledger's id", ledger.getLedgerId(), h.ledgerForStartedSegment.getLedgerId()); assertEquals("Finalized ledger's firstTxId", ledger.getFirstTxId(), h.ledgerForStartedSegment.getFirstTxId()); }
/** * Test that after a BookKeeperEditLogOutputStream we should not be able * to mutate the ledger: verify that the ledger is closed, closed ledger * can't be written to, and the appropriate exception is bubbled up to * the application. */ @Test public void testCloseClosesLedger() throws Exception { LedgerHandle ledger = createLedger(); BookKeeperEditLogOutputStream bkOs = new BookKeeperEditLogOutputStream(ledger); FSEditLogTestUtil.closeStreams(bkOs); try { // Try writing to a ledger ledger.addEntry("a".getBytes(), 0, 1); fail("Closing BookKeeperEditLogOutputStream should close the ledger!"); } catch (BKException e) { // Right exception must be bubbled up assertEquals("LedgerClosedException should propagate!", e.getCode(), Code.LedgerClosedException); } }
@Test public void testSimpleWrite() throws Exception { NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi); bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); bkjm.finalizeLogSegment(1, 100); String zkpath = bkjm.finalizedLedgerZNode(1, 100); assertNotNull(zkc.exists(zkpath, false)); assertNull(zkc.exists(bkjm.inprogressZNode(1), false)); }
@Test public void testNumberOfTransactions() throws Exception { NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi); bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); bkjm.finalizeLogSegment(1, 100); long numTrans = bkjm.getNumberOfTransactions(1, true); assertEquals(100, numTrans); }
private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm, int startTxid, int endTxid) throws IOException, KeeperException, InterruptedException { EditLogOutputStream out = bkjm.startLogSegment(startTxid); for (long i = startTxid; i <= endTxid; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); // finalize the inprogress_1 log segment. bkjm.finalizeLogSegment(startTxid, endTxid); String zkpath1 = bkjm.finalizedLedgerZNode(startTxid, endTxid); assertNotNull(zkc.exists(zkpath1, false)); assertNull(zkc.exists(bkjm.inprogressZNode(startTxid), false)); return zkpath1; }
@Test public void testNumberOfTransactionsWithGaps() throws Exception { NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-gaps"), nsi); bkjm.format(nsi); long txid = 1; for (long i = 0; i < 3; i++) { long start = txid; EditLogOutputStream out = bkjm.startLogSegment(start, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(txid++); out.write(op); } out.close(); bkjm.finalizeLogSegment(start, txid-1); assertNotNull( zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false)); } zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1); long numTrans = bkjm.getNumberOfTransactions(1, true); assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); try { numTrans = bkjm.getNumberOfTransactions(DEFAULT_SEGMENT_SIZE+1, true); fail("Should have thrown corruption exception by this point"); } catch (JournalManager.CorruptionException ce) { // if we get here, everything is going good } numTrans = bkjm.getNumberOfTransactions((DEFAULT_SEGMENT_SIZE*2)+1, true); assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); }
@Test public void testSimpleRead() throws Exception { NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-simpleread"), nsi); bkjm.format(nsi); final long numTransactions = 10000; EditLogOutputStream out = bkjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);; for (long i = 1 ; i <= numTransactions; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); bkjm.finalizeLogSegment(1, numTransactions); List<EditLogInputStream> in = new ArrayList<EditLogInputStream>(); bkjm.selectInputStreams(in, 1, true); try { assertEquals(numTransactions, FSEditLogTestUtil.countTransactionsInStream(in.get(0))); } finally { in.get(0).close(); } }
@Test public void testSimpleRecovery() throws Exception { NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"), nsi); bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);; for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.setReadyToFlush(); out.flush(); out.abort(); out.close(); assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false)); assertNotNull(zkc.exists(bkjm.inprogressZNode(1), false)); bkjm.recoverUnfinalizedSegments(); assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false)); assertNull(zkc.exists(bkjm.inprogressZNode(1), false)); }
/** * If a journal manager has an empty inprogress node, ensure that we throw an * error, as this should not be possible, and some third party has corrupted * the zookeeper state */ @Test public void testEmptyInprogressNode() throws Exception { URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress"); NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, nsi); bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);; for (long i = 1; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); bkjm.finalizeLogSegment(1, 100); out = bkjm.startLogSegment(101, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); out.close(); bkjm.close(); String inprogressZNode = bkjm.inprogressZNode(101); zkc.setData(inprogressZNode, new byte[0], -1); bkjm = new BookKeeperJournalManager(conf, uri, nsi); try { bkjm.recoverUnfinalizedSegments(); fail("Should have failed. There should be no way of creating" + " an empty inprogess znode"); } catch (IOException e) { // correct behaviour assertTrue("Exception different than expected", e.getMessage().contains( "Invalid/Incomplete data in znode")); } finally { bkjm.close(); } }
/** * If a journal manager has an corrupt inprogress node, ensure that we throw * an error, as this should not be possible, and some third party has * corrupted the zookeeper state */ @Test public void testCorruptInprogressNode() throws Exception { URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress"); NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, nsi); bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);; for (long i = 1; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); bkjm.finalizeLogSegment(1, 100); out = bkjm.startLogSegment(101, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); out.close(); bkjm.close(); String inprogressZNode = bkjm.inprogressZNode(101); zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1); bkjm = new BookKeeperJournalManager(conf, uri, nsi); try { bkjm.recoverUnfinalizedSegments(); fail("Should have failed. There should be no way of creating" + " an empty inprogess znode"); } catch (IOException e) { // correct behaviour assertTrue("Exception different than expected", e.getMessage().contains( "has no field named")); } finally { bkjm.close(); } }