@Test public void testGetInputStreamNoValidationNoCheckLastTxId() throws Exception { setupTest("test-get-input-stream-no-validation-no-check-last-txid"); File tempEditsFile = FSEditLogTestUtil.createTempEditsFile( "test-get-input-stream-with-validation"); try { EditLogOutputStream bkeos = bkjm.startLogSegment(1); EditLogOutputStream elfos = new EditLogFileOutputStream(tempEditsFile, null); elfos.create(); FSEditLogTestUtil.populateStreams(1, 100, bkeos, elfos); EditLogInputStream bkeis = getJournalInputStreamDontCheckLastTxId(bkjm, 1); EditLogInputStream elfis = new EditLogFileInputStream(tempEditsFile); Map<String, EditLogInputStream> streamByName = ImmutableMap.of("BookKeeper", bkeis, "File", elfis); FSEditLogTestUtil.assertStreamsAreEquivalent(100, streamByName); } finally { if (!tempEditsFile.delete()) { LOG.warn("Unable to delete edits file: " + tempEditsFile.getAbsolutePath()); } } }
@Override public void selectInputStreams(Collection<EditLogInputStream> streams, long fromTxnId, boolean inProgressOk) throws IOException { QuorumCall<AsyncLogger, RemoteEditLogManifest> q = loggers.getEditLogManifest(fromTxnId, inProgressOk); Map<AsyncLogger, RemoteEditLogManifest> resps = loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs, "selectInputStreams"); LOG.debug("selectInputStream manifests:\n" + Joiner.on("\n").withKeyValueSeparator(": ").join(resps)); final PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<EditLogInputStream>(64, JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) { AsyncLogger logger = e.getKey(); RemoteEditLogManifest manifest = e.getValue(); for (RemoteEditLog remoteLog : manifest.getLogs()) { URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId()); EditLogInputStream elis = EditLogFileInputStream.fromUrl( connectionFactory, url, remoteLog.getStartTxId(), remoteLog.getEndTxId(), remoteLog.isInProgress()); allStreams.add(elis); } } JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId); }
private long verifyEditLogs(FSNamesystem namesystem, FSImage fsimage, String logFileName, long startTxId) throws IOException { long numEdits = -1; // Verify that we can read in all the transactions that we have written. // If there were any corruptions, it is likely that the reading in // of these transactions will throw an exception. for (StorageDirectory sd : fsimage.storage.dirIterable(NameNodeDirType.EDITS)) { File editFile = new File(sd.getCurrentDir(), logFileName); System.out.println("Verifying file: " + editFile); FSEditLogLoader loader = new FSEditLogLoader(namesystem); int numEditsThisLog = loader.loadFSEdits(new EditLogFileInputStream(editFile), startTxId -1); System.out.println("Number of edits: " + numEditsThisLog); LOG.info("num edits: " + numEdits + " this log: " + numEditsThisLog); assertTrue(numEdits == -1 || numEditsThisLog == numEdits); numEdits = numEditsThisLog; } assertTrue(numEdits != -1); return numEdits; }
public EditLogByteInputStream(byte[] data) throws IOException { len = data.length; input = new ByteArrayInputStream(data); BufferedInputStream bin = new BufferedInputStream(input); DataInputStream in = new DataInputStream(bin); tracker = new FSEditLogLoader.PositionTrackingInputStream(in); in = new DataInputStream(tracker); version = EditLogFileInputStream.readLogVersion(in); reader = new FSEditLogOp.Reader(in, version); }
private void testReadFromClosedLedgerAfterWriteInner(int numEdits) throws Exception { LedgerHandle ledgerOut = createLedger(); long ledgerId = ledgerOut.getId(); BookKeeperEditLogOutputStream bkEditsOut = new BookKeeperEditLogOutputStream(ledgerOut); EditLogFileOutputStream fileEditsOut = new EditLogFileOutputStream(tempEditsFile, null); FSEditLogTestUtil.createAndPopulateStreams(1, numEdits, bkEditsOut, fileEditsOut); BookKeeperEditLogInputStream bkEditsIn = new BookKeeperEditLogInputStream(ledgerProvider, ledgerId, 0, 1, numEdits, false); EditLogFileInputStream fileEditsIn = new EditLogFileInputStream(tempEditsFile); assertEquals("Length in bytes must be equal!", bkEditsIn.length(), fileEditsIn.length()); FSEditLogTestUtil.assertStreamsAreEquivalent(numEdits, ImmutableMap.of("BookKeeper", bkEditsIn, "File", fileEditsIn)); assertNull("BookKeeper edit log must end at txid 100", bkEditsIn.readOp()); }
@Test public void testGetInputStreamWithValidation() throws Exception { setupTest("test-get-input-stream-with-validation"); File tempEditsFile = FSEditLogTestUtil.createTempEditsFile( "test-get-input-stream-with-validation"); try { TestBKJMInjectionHandler h = new TestBKJMInjectionHandler(); InjectionHandler.set(h); EditLogOutputStream bkeos = bkjm.startLogSegment(1); EditLogOutputStream elfos = new EditLogFileOutputStream(tempEditsFile, null); elfos.create(); FSEditLogTestUtil.populateStreams(1, 100, bkeos, elfos); EditLogInputStream bkeis = FSEditLogTestUtil.getJournalInputStream(bkjm, 1, true); EditLogInputStream elfis = new EditLogFileInputStream(tempEditsFile); Map<String, EditLogInputStream> streamByName = ImmutableMap.of("BookKeeper", bkeis, "File", elfis); FSEditLogTestUtil.assertStreamsAreEquivalent(100, streamByName); assertNotNull("Log was validated", h.logValidation); assertEquals("numTrasactions validated correctly", 100, h.logValidation.getNumTransactions()); assertEquals("endTxId validated correctly", 100, h.logValidation.getEndTxId()); } finally { if (!tempEditsFile.delete()) { LOG.warn("Unable to delete edits file: " + tempEditsFile.getAbsolutePath()); } } }
@Override public void selectInputStreams(Collection<EditLogInputStream> streams, long fromTxnId, boolean inProgressOk, boolean forReading) throws IOException { QuorumCall<AsyncLogger, RemoteEditLogManifest> q = loggers.getEditLogManifest(fromTxnId, forReading); Map<AsyncLogger, RemoteEditLogManifest> resps = loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs, "selectInputStreams"); LOG.debug("selectInputStream manifests:\n" + Joiner.on("\n").withKeyValueSeparator(": ").join(resps)); final PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<EditLogInputStream>(64, JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) { AsyncLogger logger = e.getKey(); RemoteEditLogManifest manifest = e.getValue(); for (RemoteEditLog remoteLog : manifest.getLogs()) { URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId()); EditLogInputStream elis = EditLogFileInputStream.fromUrl( url, remoteLog.getStartTxId(), remoteLog.getEndTxId(), remoteLog.isInProgress()); allStreams.add(elis); } } JournalSet.chainAndMakeRedundantStreams( streams, allStreams, fromTxnId, inProgressOk); }
@Override public void selectInputStreams(Collection<EditLogInputStream> streams, long fromTxnId, boolean inProgressOk, boolean forReading) throws IOException { QuorumCall<AsyncLogger, RemoteEditLogManifest> q = loggers.getEditLogManifest(fromTxnId, forReading, inProgressOk); Map<AsyncLogger, RemoteEditLogManifest> resps = loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs, "selectInputStreams"); LOG.debug("selectInputStream manifests:\n" + Joiner.on("\n").withKeyValueSeparator(": ").join(resps)); final PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<EditLogInputStream>(64, JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) { AsyncLogger logger = e.getKey(); RemoteEditLogManifest manifest = e.getValue(); for (RemoteEditLog remoteLog : manifest.getLogs()) { URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId()); EditLogInputStream elis = EditLogFileInputStream.fromUrl( url, remoteLog.getStartTxId(), remoteLog.getEndTxId(), remoteLog.isInProgress()); allStreams.add(elis); } } JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId); }
private void verifyEditLogs(FSNamesystem namesystem, FSImage fsimage) throws IOException { // Verify that we can read in all the transactions that we have written. // If there were any corruptions, it is likely that the reading in // of these transactions will throw an exception. for (Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) { File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS); System.out.println("Verifying file: " + editFile); int numEdits = new FSEditLogLoader(namesystem).loadFSEdits( new EditLogFileInputStream(editFile)); System.out.println("Number of edits: " + numEdits); } }
@Test public void testPreserveEditLogs() throws Exception { conf = new HdfsConfiguration(); conf = UpgradeUtilities.initializeStorageStateConf(1, conf); String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY); conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false); log("Normal NameNode upgrade", 1); File[] created = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current"); for (final File createdDir : created) { List<String> fileNameList = IOUtils.listDirectory(createdDir, EditLogsFilter.INSTANCE); for (String fileName : fileNameList) { String tmpFileName = fileName + ".tmp"; File existingFile = new File(createdDir, fileName); File tmpFile = new File(createdDir, tmpFileName); Files.move(existingFile.toPath(), tmpFile.toPath()); File newFile = new File(createdDir, fileName); Preconditions.checkState(newFile.createNewFile(), "Cannot create new edits log file in " + createdDir); EditLogFileInputStream in = new EditLogFileInputStream(tmpFile, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, false); EditLogFileOutputStream out = new EditLogFileOutputStream(conf, newFile, (int)tmpFile.length()); out.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION + 1); FSEditLogOp logOp = in.readOp(); while (logOp != null) { out.write(logOp); logOp = in.readOp(); } out.setReadyToFlush(); out.flushAndSync(true); out.close(); Files.delete(tmpFile.toPath()); } } cluster = createCluster(); DFSInotifyEventInputStream ieis = cluster.getFileSystem().getInotifyEventStream(0); EventBatch batch = ieis.poll(); Event[] events = batch.getEvents(); assertTrue("Should be able to get transactions before the upgrade.", events.length > 0); assertEquals(events[0].getEventType(), Event.EventType.CREATE); assertEquals(((CreateEvent) events[0]).getPath(), "/TestUpgrade"); cluster.shutdown(); UpgradeUtilities.createEmptyDirs(nameNodeDirs); }
private void testReadAndRefreshAfterEachTransactionInner(int numEdits) throws Exception { FSEditLog.sizeFlushBuffer = 100; LedgerHandle ledgerOut = createLedger(); long ledgerId = ledgerOut.getId(); BookKeeperEditLogOutputStream bkEditsOut = new BookKeeperEditLogOutputStream(ledgerOut); EditLogFileOutputStream fileEditsOut = new EditLogFileOutputStream(tempEditsFile, null); FSEditLogTestUtil.createAndPopulateStreams(1, numEdits, bkEditsOut, fileEditsOut); BookKeeperEditLogInputStream bkEditsIn = new BookKeeperEditLogInputStream(ledgerProvider, ledgerId, 0, 1, numEdits, false); EditLogFileInputStream fileEditsIn = new EditLogFileInputStream(tempEditsFile); assertEquals("Length in bytes must be equal!", bkEditsIn.length(), fileEditsIn.length()); long lastBkPos = bkEditsIn.getPosition(); long lastFilePos = fileEditsIn.getPosition(); for (int i = 1; i <= numEdits; i++) { assertEquals("Position in file must be equal position in bk", lastBkPos, lastFilePos); bkEditsIn.refresh(lastBkPos, -1); fileEditsIn.refresh(lastFilePos, -1); FSEditLogOp opFromBk = bkEditsIn.readOp(); FSEditLogOp opFromFile = fileEditsIn.readOp(); if (LOG.isDebugEnabled()) { LOG.debug("txId = " + i + ", " + "opFromBk = " + opFromBk + ", opFromFile = " + opFromFile); } assertEquals( "Operation read from file and BookKeeper must be same after refresh", opFromBk, opFromFile); lastBkPos = bkEditsIn.getPosition(); lastFilePos = fileEditsIn.getPosition(); } assertNull("BookKeeper edit log must end at last txId", bkEditsIn.readOp()); }
private void testReadBufferGreaterThanLedgerSizeInner(int numEdits) throws Exception { LedgerHandle ledgerOut = createLedger(); long ledgerId = ledgerOut.getId(); BookKeeperEditLogInputStream bkEditsIn = new BookKeeperEditLogInputStream(ledgerProvider, ledgerId, 0, 1, -1, true); EditLogFileOutputStream fileEditsOut = new EditLogFileOutputStream(tempEditsFile, null); bkEditsIn.init(); // Set the edit log buffer flush size smaller than the size of // of the buffer in BufferedInputStream in BookKeeperJournalInputStream FSEditLog.sizeFlushBuffer = bkEditsIn.bin.available() / 3; LOG.info("Set flush buffer size to " + FSEditLog.sizeFlushBuffer); BookKeeperEditLogOutputStream bkEditsOut = new BookKeeperEditLogOutputStream(ledgerOut); FSEditLogTestUtil.createAndPopulateStreams(1, numEdits, bkEditsOut, fileEditsOut); // Re-try refreshing up to ten times until we are able to refresh // successfully to be beginning of the ledger and read the edit log // layout version int maxTries = 10; for (int i = 0; i < maxTries; i++) { try { bkEditsIn.refresh(0, -1); assertEquals("refresh succeeded", bkEditsIn.logVersion, FSConstants.LAYOUT_VERSION); } catch (AssertionFailedError e) { if (i == maxTries) { // Fail the unit test rethrowing the assertion failure if we've // reached the maximum number of retries throw e; } } } EditLogFileInputStream fileEditsIn = new EditLogFileInputStream(tempEditsFile); for (int i = 0; i <= numEdits; i++) { FSEditLogOp opFromBk = bkEditsIn.readOp(); FSEditLogOp opFromFile = fileEditsIn.readOp(); if (LOG.isDebugEnabled()) { LOG.debug("txId = " + i + ", " + "opFromBk = " + opFromBk + ", opFromFile = " + opFromFile); } assertEquals( "Operation read from file and BookKeeper must be same after refresh", opFromBk, opFromFile); } assertNull("BookKeeper edit log must end at txid 1000", bkEditsIn.readOp()); }
/** * Merge Journal Spool to memory.<p> * Journal Spool reader reads journal records from edits.new. * When it reaches the end of the file it sets {@link JSpoolState} to WAIT. * This blocks journaling (see {@link #journal(int,byte[])}. * The reader * <ul> * <li> reads remaining journal records if any,</li> * <li> renames edits.new to edits,</li> * <li> sets {@link JSpoolState} to OFF,</li> * <li> and notifies the journaling thread.</li> * </ul> * Journaling resumes with applying new journal records to the memory state, * and writing them into edits file(s). */ void convergeJournalSpool() throws IOException { Iterator<StorageDirectory> itEdits = dirIterator(NameNodeDirType.EDITS); if(! itEdits.hasNext()) throw new IOException("Could not locate checkpoint directories"); StorageDirectory sdEdits = itEdits.next(); int numEdits = 0; File jSpoolFile = getJSpoolFile(sdEdits); long startTime = now(); if(jSpoolFile.exists()) { // load edits.new EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile); DataInputStream in = edits.getDataInputStream(); FSEditLogLoader logLoader = new FSEditLogLoader(namesystem); numEdits += logLoader.loadFSEdits(in, false); // first time reached the end of spool jsState = JSpoolState.WAIT; numEdits += logLoader.loadEditRecords(getLayoutVersion(), in, true); getFSNamesystem().dir.updateCountForINodeWithQuota(); edits.close(); } FSImage.LOG.info("Edits file " + jSpoolFile.getCanonicalPath() + " of size " + jSpoolFile.length() + " edits # " + numEdits + " loaded in " + (now()-startTime)/1000 + " seconds."); // rename spool edits.new to edits making it in sync with the active node // subsequent journal records will go directly to edits editLog.revertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE); // write version file resetVersion(false, imageDigest); // wake up journal writer synchronized(this) { jsState = JSpoolState.OFF; notifyAll(); } // Rename lastcheckpoint.tmp to previous.checkpoint for(StorageDirectory sd : storageDirs) { moveLastCheckpoint(sd); } }
/** * Tests transaction logging in dfs. */ public void testEditLog() throws IOException { // start a cluster Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = null; FileSystem fileSys = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build(); cluster.waitActive(); fileSys = cluster.getFileSystem(); final FSNamesystem namesystem = cluster.getNamesystem(); for (Iterator<URI> it = cluster.getNameDirs().iterator(); it.hasNext(); ) { File dir = new File(it.next().getPath()); System.out.println(dir); } FSImage fsimage = namesystem.getFSImage(); FSEditLog editLog = fsimage.getEditLog(); // set small size of flush buffer editLog.setBufferCapacity(2048); editLog.close(); editLog.open(); namesystem.getDelegationTokenSecretManager().startThreads(); // Create threads and make them run transactions concurrently. Thread threadId[] = new Thread[NUM_THREADS]; for (int i = 0; i < NUM_THREADS; i++) { Transactions trans = new Transactions(namesystem, NUM_TRANSACTIONS); threadId[i] = new Thread(trans, "TransactionThread-" + i); threadId[i].start(); } // wait for all transactions to get over for (int i = 0; i < NUM_THREADS; i++) { try { threadId[i].join(); } catch (InterruptedException e) { i--; // retry } } editLog.close(); // Verify that we can read in all the transactions that we have written. // If there were any corruptions, it is likely that the reading in // of these transactions will throw an exception. // FSEditLogLoader loader = new FSEditLogLoader(namesystem); namesystem.getDelegationTokenSecretManager().stopThreads(); int numKeys = namesystem.getDelegationTokenSecretManager().getNumberOfKeys(); for (Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) { File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS); System.out.println("Verifying file: " + editFile); int numEdits = loader.loadFSEdits( new EditLogFileInputStream(editFile)); assertTrue("Verification for " + editFile + " failed. " + "Expected " + (NUM_THREADS * opsPerTrans * NUM_TRANSACTIONS + numKeys) + " transactions. "+ "Found " + numEdits + " transactions.", numEdits == NUM_THREADS * opsPerTrans * NUM_TRANSACTIONS +numKeys); } } finally { if(fileSys != null) fileSys.close(); if(cluster != null) cluster.shutdown(); } }
/** * Test edit log with different initial buffer size * * @param initialSize initial edit log buffer size * @throws IOException */ private void testEditLog(int initialSize) throws IOException { // start a cluster Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = null; FileSystem fileSys = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build(); cluster.waitActive(); fileSys = cluster.getFileSystem(); final FSNamesystem namesystem = cluster.getNamesystem(); for (Iterator<URI> it = cluster.getNameDirs().iterator(); it.hasNext(); ) { File dir = new File(it.next().getPath()); System.out.println(dir); } FSImage fsimage = namesystem.getFSImage(); FSEditLog editLog = fsimage.getEditLog(); // set small size of flush buffer editLog.setBufferCapacity(initialSize); editLog.close(); editLog.open(); // Create threads and make them run transactions concurrently. Thread threadId[] = new Thread[NUM_THREADS]; for (int i = 0; i < NUM_THREADS; i++) { Transactions trans = new Transactions(namesystem, NUM_TRANSACTIONS); threadId[i] = new Thread(trans, "TransactionThread-" + i); threadId[i].start(); } // wait for all transactions to get over for (int i = 0; i < NUM_THREADS; i++) { try { threadId[i].join(); } catch (InterruptedException e) { i--; // retry } } editLog.close(); editLog.open(); // Verify that we can read in all the transactions that we have written. // If there were any corruptions, it is likely that the reading in // of these transactions will throw an exception. // FSEditLogLoader loader = new FSEditLogLoader(namesystem); for (Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) { File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS); System.out.println("Verifying file: " + editFile); int numEdits = loader.loadFSEdits( new EditLogFileInputStream(editFile)); int numLeases = namesystem.leaseManager.countLease(); System.out.println("Number of outstanding leases " + numLeases); assertEquals(0, numLeases); assertTrue("Verification for " + editFile + " failed. " + "Expected " + (NUM_THREADS * 2 * NUM_TRANSACTIONS) + " transactions. "+ "Found " + numEdits + " transactions.", numEdits == NUM_THREADS * 2 * NUM_TRANSACTIONS); } } finally { if(fileSys != null) fileSys.close(); if(cluster != null) cluster.shutdown(); } }