static EditLogInputStream getJournalInputStreamDontCheckLastTxId( JournalManager jm, long txId) throws IOException { List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>(); jm.selectInputStreams(streams, txId, true, false); if (streams.size() < 1) { throw new IOException("Cannot obtain stream for txid: " + txId); } Collections.sort(streams, JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); if (txId == HdfsConstants.INVALID_TXID) { return streams.get(0); } for (EditLogInputStream elis : streams) { if (elis.getFirstTxId() == txId) { return elis; } } throw new IOException("Cannot obtain stream for txid: " + txId); }
@Test public void testNumberOfTransactionsWithGaps() throws Exception { NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-gaps"), nsi); bkjm.format(nsi); long txid = 1; for (long i = 0; i < 3; i++) { long start = txid; EditLogOutputStream out = bkjm.startLogSegment(start, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(txid++); out.write(op); } out.close(); bkjm.finalizeLogSegment(start, txid-1); assertNotNull( zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false)); } zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1); long numTrans = bkjm.getNumberOfTransactions(1, true); assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); try { numTrans = bkjm.getNumberOfTransactions(DEFAULT_SEGMENT_SIZE+1, true); fail("Should have thrown corruption exception by this point"); } catch (JournalManager.CorruptionException ce) { // if we get here, everything is going good } numTrans = bkjm.getNumberOfTransactions((DEFAULT_SEGMENT_SIZE*2)+1, true); assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); }
private JournalManager constructJournalManager(URI editsUri) throws IOException { if (editsUri.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) { StorageDirectory sd = new NNStorage(new StorageInfo()).new StorageDirectory( new File(editsUri.getPath())); return new FileJournalManagerReadOnly(sd); } else if (editsUri.getScheme().equals(QuorumJournalManager.QJM_URI_SCHEME)) { return new QuorumJournalManager(conf, editsUri, new NamespaceInfo(new StorageInfo()), null, false); } else { throwIOException("Other journals not supported yet.", null); } return null; }
@Test public void testNumberOfTransactionsWithGaps() throws Exception { NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-gaps"), nsi); bkjm.format(nsi); long txid = 1; for (long i = 0; i < 3; i++) { long start = txid; EditLogOutputStream out = bkjm.startLogSegment(start); for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(txid++); out.write(op); } out.close(); bkjm.finalizeLogSegment(start, txid-1); assertNotNull( zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false)); } zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1); long numTrans = bkjm.getNumberOfTransactions(1, true); assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); try { numTrans = bkjm.getNumberOfTransactions(DEFAULT_SEGMENT_SIZE+1, true); fail("Should have thrown corruption exception by this point"); } catch (JournalManager.CorruptionException ce) { // if we get here, everything is going good } numTrans = bkjm.getNumberOfTransactions((DEFAULT_SEGMENT_SIZE*2)+1, true); assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); }