/** * Regression test for HDFS-3725. One of the journal nodes is down * during the writing of one segment, then comes back up later to * take part in a later segment. Thus, its local edits are * not a contiguous sequence. This should be handled correctly. */ @Test public void testOneJNMissingSegments() throws Exception { writeSegment(cluster, qjm, 1, 3, true); waitForAllPendingCalls(qjm.getLoggerSetForTests()); cluster.getJournalNode(0).stopAndJoin(0); writeSegment(cluster, qjm, 4, 3, true); waitForAllPendingCalls(qjm.getLoggerSetForTests()); cluster.restartJournalNode(0); writeSegment(cluster, qjm, 7, 3, true); waitForAllPendingCalls(qjm.getLoggerSetForTests()); cluster.getJournalNode(1).stopAndJoin(0); QuorumJournalManager readerQjm = createSpyingQJM(); List<EditLogInputStream> streams = Lists.newArrayList(); try { readerQjm.selectInputStreams(streams, 1, false); verifyEdits(streams, 1, 9); } finally { IOUtils.cleanup(LOG, streams.toArray(new Closeable[0])); readerQjm.close(); } }
/** * Regression test for HDFS-3891: selectInputStreams should throw * an exception when a majority of journalnodes have crashed. */ @Test public void testSelectInputStreamsMajorityDown() throws Exception { // Shut down all of the JNs. cluster.shutdown(); List<EditLogInputStream> streams = Lists.newArrayList(); try { qjm.selectInputStreams(streams, 0, false); fail("Did not throw IOE"); } catch (QuorumException ioe) { GenericTestUtils.assertExceptionContains( "Got too many exceptions", ioe); assertTrue(streams.isEmpty()); } }
@Test public void testSelectInputStreamsNotOnBoundary() throws Exception { final int txIdsPerSegment = 10; for (int txid = 1; txid <= 5 * txIdsPerSegment; txid += txIdsPerSegment) { writeSegment(cluster, qjm, txid, txIdsPerSegment, true); } File curDir = cluster.getCurrentDir(0, JID); GenericTestUtils.assertGlobEquals(curDir, "edits_.*", NNStorage.getFinalizedEditsFileName(1, 10), NNStorage.getFinalizedEditsFileName(11, 20), NNStorage.getFinalizedEditsFileName(21, 30), NNStorage.getFinalizedEditsFileName(31, 40), NNStorage.getFinalizedEditsFileName(41, 50)); ArrayList<EditLogInputStream> streams = new ArrayList<EditLogInputStream>(); qjm.selectInputStreams(streams, 25, false); verifyEdits(streams, 25, 50); }
public static long recoverAndReturnLastTxn(QuorumJournalManager qjm) throws IOException { qjm.recoverUnfinalizedSegments(); long lastRecoveredTxn = 0; List<EditLogInputStream> streams = Lists.newArrayList(); try { qjm.selectInputStreams(streams, 0, false); for (EditLogInputStream elis : streams) { assertTrue(elis.getFirstTxId() > lastRecoveredTxn); lastRecoveredTxn = elis.getLastTxId(); } } finally { IOUtils.cleanup(null, streams.toArray(new Closeable[0])); } return lastRecoveredTxn; }
/** * Regression test for HDFS-3725. One of the journal nodes is down * during the writing of one segment, then comes back up later to * take part in a later segment. Thus, its local edits are * not a contiguous sequence. This should be handled correctly. */ @Test public void testOneJNMissingSegments() throws Exception { writeSegment(cluster, qjm, 1, 3, true); waitForAllPendingCalls(qjm.getLoggerSetForTests()); cluster.getJournalNode(0).stopAndJoin(0); writeSegment(cluster, qjm, 4, 3, true); waitForAllPendingCalls(qjm.getLoggerSetForTests()); cluster.restartJournalNode(0); writeSegment(cluster, qjm, 7, 3, true); waitForAllPendingCalls(qjm.getLoggerSetForTests()); cluster.getJournalNode(1).stopAndJoin(0); QuorumJournalManager readerQjm = createSpyingQJM(); List<EditLogInputStream> streams = Lists.newArrayList(); try { readerQjm.selectInputStreams(streams, 1, false, true); verifyEdits(streams, 1, 9); } finally { IOUtils.cleanup(LOG, streams.toArray(new Closeable[0])); readerQjm.close(); } }
/** * Regression test for HDFS-3891: selectInputStreams should throw * an exception when a majority of journalnodes have crashed. */ @Test public void testSelectInputStreamsMajorityDown() throws Exception { // Shut down all of the JNs. cluster.shutdown(); List<EditLogInputStream> streams = Lists.newArrayList(); try { qjm.selectInputStreams(streams, 0, false, true); fail("Did not throw IOE"); } catch (QuorumException ioe) { GenericTestUtils.assertExceptionContains( "Got too many exceptions", ioe); assertTrue(streams.isEmpty()); } }
/** * Ensure that refresh functionality does not work for finalized streams (at * startup) */ @Test public void testRefreshOnlyForInprogress() throws Exception { // start new segment EditLogOutputStream stm = qjm.startLogSegment(0); // write a bunch of transactions QJMTestUtil.writeTxns(stm, 0, 10); qjm.finalizeLogSegment(0, 9); // get input stream List<EditLogInputStream> streams = Lists.newArrayList(); // get only finalized streams qjm.selectInputStreams(streams, 0, false, false); try { // try refreshing the stream (this is startup mode // inprogress segments not allowed -> refresh should fail streams.get(0).refresh(10, 0); fail("The shream should not allow refreshing"); } catch (IOException e) { LOG.info("Expected exception: ", e); } }
/** * Get the journal node we are tailing from, and indicate which stream this is. */ private JournalNode getTailingJN(EditLogInputStream str, URLLogInputStream[] tailingStream) throws Exception { RedundantEditLogInputStream is = (RedundantEditLogInputStream) str; Field curIdxF = RedundantEditLogInputStream.class .getDeclaredField("curIdx"); curIdxF.setAccessible(true); int curIdx = curIdxF.getInt(is); URLLogInputStream[] streams = getStreams(is); JournalNode jn = null; for (JournalNode j : cluster.getJournalNodes()) { if (streams[curIdx].getName().contains( Integer.toString(j.getBoundHttpAddress().getPort()))) { jn = j; break; } } tailingStream[0] = streams[curIdx]; return jn; }
public static long recoverAndReturnLastTxn(QuorumJournalManager qjm) throws IOException { qjm.recoverUnfinalizedSegments(); long lastRecoveredTxn = 0; List<EditLogInputStream> streams = Lists.newArrayList(); try { qjm.selectInputStreams(streams, 0, false, true); for (EditLogInputStream elis : streams) { assertTrue(elis.getFirstTxId() > lastRecoveredTxn); lastRecoveredTxn = elis.getLastTxId(); } } finally { IOUtils.cleanup(null, streams.toArray(new Closeable[0])); } return lastRecoveredTxn; }
@Test public void testGetInputStreamNoValidationNoCheckLastTxId() throws Exception { setupTest("test-get-input-stream-no-validation-no-check-last-txid"); File tempEditsFile = FSEditLogTestUtil.createTempEditsFile( "test-get-input-stream-with-validation"); try { EditLogOutputStream bkeos = bkjm.startLogSegment(1); EditLogOutputStream elfos = new EditLogFileOutputStream(tempEditsFile, null); elfos.create(); FSEditLogTestUtil.populateStreams(1, 100, bkeos, elfos); EditLogInputStream bkeis = getJournalInputStreamDontCheckLastTxId(bkjm, 1); EditLogInputStream elfis = new EditLogFileInputStream(tempEditsFile); Map<String, EditLogInputStream> streamByName = ImmutableMap.of("BookKeeper", bkeis, "File", elfis); FSEditLogTestUtil.assertStreamsAreEquivalent(100, streamByName); } finally { if (!tempEditsFile.delete()) { LOG.warn("Unable to delete edits file: " + tempEditsFile.getAbsolutePath()); } } }
static EditLogInputStream getJournalInputStreamDontCheckLastTxId( JournalManager jm, long txId) throws IOException { List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>(); jm.selectInputStreams(streams, txId, true, false); if (streams.size() < 1) { throw new IOException("Cannot obtain stream for txid: " + txId); } Collections.sort(streams, JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); if (txId == HdfsConstants.INVALID_TXID) { return streams.get(0); } for (EditLogInputStream elis : streams) { if (elis.getFirstTxId() == txId) { return elis; } } throw new IOException("Cannot obtain stream for txid: " + txId); }
@Test public void testSelectInputStreamsNotOnBoundary() throws Exception { final int txIdsPerSegment = 10; for (int txid = 1; txid <= 5 * txIdsPerSegment; txid += txIdsPerSegment) { writeSegment(cluster, qjm, txid, txIdsPerSegment, true); } File curDir = cluster.getCurrentDir(0, JID); GenericTestUtils.assertGlobEquals(curDir, "edits_.*", NNStorage.getFinalizedEditsFileName(1, 10), NNStorage.getFinalizedEditsFileName(11, 20), NNStorage.getFinalizedEditsFileName(21, 30), NNStorage.getFinalizedEditsFileName(31, 40), NNStorage.getFinalizedEditsFileName(41, 50)); ArrayList<EditLogInputStream> streams = new ArrayList<EditLogInputStream>(); qjm.selectInputStreams(streams, 25, false, false); verifyEdits(streams, 25, 50); }