/** * Tests transaction logging in dfs. */ public void testEditLog() throws IOException { // start a cluster Configuration conf = new Configuration(); MiniDFSCluster cluster = null; FileSystem fileSys = null; try { cluster = new MiniDFSCluster(conf, NUM_DATA_NODES, true, null); cluster.waitActive(); fileSys = cluster.getFileSystem(); final FSNamesystem namesystem = cluster.getNameNode().getNamesystem(); for (Iterator<File> it = cluster.getNameDirs().iterator(); it.hasNext(); ) { File dir = new File(it.next().getPath()); System.out.println(dir); } FSImage fsimage = namesystem.getFSImage(); FSEditLog editLog = fsimage.getEditLog(); // set small size of flush buffer editLog.setBufferCapacity(2048); editLog.close(); editLog.open(); namesystem.getDelegationTokenSecretManager().startThreads(); // Create threads and make them run transactions concurrently. Thread threadId[] = new Thread[NUM_THREADS]; for (int i = 0; i < NUM_THREADS; i++) { Transactions trans = new Transactions(namesystem, NUM_TRANSACTIONS); threadId[i] = new Thread(trans, "TransactionThread-" + i); threadId[i].start(); } // wait for all transactions to get over for (int i = 0; i < NUM_THREADS; i++) { try { threadId[i].join(); } catch (InterruptedException e) { i--; // retry } } editLog.close(); // Verify that we can read in all the transactions that we have written. // If there were any corruptions, it is likely that the reading in // of these transactions will throw an exception. // namesystem.getDelegationTokenSecretManager().stopThreads(); int numKeys = namesystem.getDelegationTokenSecretManager().getNumberOfKeys(); for (Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) { File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS); System.out.println("Verifying file: " + editFile); int numEdits = FSEditLog.loadFSEdits( new EditLogFileInputStream(editFile), -1); assertTrue("Verification for " + editFile + " failed. " + "Expected " + (NUM_THREADS * opsPerTrans * NUM_TRANSACTIONS + numKeys) + " transactions. "+ "Found " + numEdits + " transactions.", numEdits == NUM_THREADS * opsPerTrans * NUM_TRANSACTIONS +numKeys); } } finally { if(fileSys != null) fileSys.close(); if(cluster != null) cluster.shutdown(); } }
/** * Tests transaction logging in dfs. */ public void testEditLog() throws IOException { // start a cluster Collection<File> namedirs = null; Collection<File> editsdirs = null; Configuration conf = new Configuration(); MiniDFSCluster cluster = new MiniDFSCluster(0, conf, numDatanodes, true, true, null, null); cluster.waitActive(); FileSystem fileSys = cluster.getFileSystem(); int numdirs = 0; try { namedirs = cluster.getNameDirs(); editsdirs = cluster.getNameEditsDirs(); } finally { fileSys.close(); cluster.shutdown(); } for (Iterator it = namedirs.iterator(); it.hasNext(); ) { File dir = (File)it.next(); System.out.println(dir); numdirs++; } FSImage fsimage = new FSImage(namedirs, editsdirs); FSEditLog editLog = fsimage.getEditLog(); // set small size of flush buffer editLog.setBufferCapacity(2048); editLog.close(); editLog.open(); // Create threads and make them run transactions concurrently. Thread threadId[] = new Thread[numThreads]; for (int i = 0; i < numThreads; i++) { Transactions trans = new Transactions(editLog, numberTransactions); threadId[i] = new Thread(trans, "TransactionThread-" + i); threadId[i].start(); } // wait for all transactions to get over for (int i = 0; i < numThreads; i++) { try { threadId[i].join(); } catch (InterruptedException e) { i--; // retry } } editLog.close(); // Verify that we can read in all the transactions that we have written. // If there were any corruptions, it is likely that the reading in // of these transactions will throw an exception. // for (Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) { File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS); System.out.println("Verifying file: " + editFile); int numEdits = FSEditLog.loadFSEdits(new EditLogFileInputStream(editFile), -1); int numLeases = FSNamesystem.getFSNamesystem().leaseManager.countLease(); System.out.println("Number of outstanding leases " + numLeases); assertEquals(0, numLeases); assertTrue("Verification for " + editFile + " failed. " + "Expected " + (numThreads * 2 * numberTransactions) + " transactions. "+ "Found " + numEdits + " transactions.", numEdits == numThreads * 2 * numberTransactions); } }
/** * Tests transaction logging in dfs. */ public void testEditLog() throws IOException { // start a cluster Collection<File> namedirs = null; Collection<File> editsdirs = null; Configuration conf = new Configuration(); MiniDFSCluster cluster = new MiniDFSCluster(0, conf, numDatanodes, true, true, null, null); cluster.waitActive(); FileSystem fileSys = cluster.getFileSystem(); int numdirs = 0; try { namedirs = cluster.getNameDirs(); editsdirs = cluster.getNameEditsDirs(); } finally { fileSys.close(); cluster.shutdown(); } for (Iterator it = namedirs.iterator(); it.hasNext(); ) { File dir = (File)it.next(); System.out.println(dir); numdirs++; } FSImage fsimage = new FSImage(namedirs, editsdirs); FSEditLog editLog = fsimage.getEditLog(); // set small size of flush buffer editLog.setBufferCapacity(2048); editLog.close(); editLog.open(); // Create threads and make them run transactions concurrently. Thread threadId[] = new Thread[numThreads]; for (int i = 0; i < numThreads; i++) { Transactions trans = new Transactions(editLog, numberTransactions); threadId[i] = new Thread(trans, "TransactionThread-" + i); threadId[i].start(); } // wait for all transactions to get over for (int i = 0; i < numThreads; i++) { try { threadId[i].join(); } catch (InterruptedException e) { i--; // retry } } editLog.close(); // Verify that we can read in all the transactions that we have written. // If there were any corruptions, it is likely that the reading in // of these transactions will throw an exception. // for (Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) { File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS); System.out.println("Verifying file: " + editFile); int numEdits = FSEditLog.loadFSEdits(new EditLogFileInputStream(editFile)); int numLeases = FSNamesystem.getFSNamesystem().leaseManager.countLease(); System.out.println("Number of outstanding leases " + numLeases); assertEquals(0, numLeases); assertTrue("Verification for " + editFile + " failed. " + "Expected " + (numThreads * 2 * numberTransactions) + " transactions. "+ "Found " + numEdits + " transactions.", numEdits == numThreads * 2 * numberTransactions); } }