/** * Private helper methods to save delegation keys and tokens in fsimage */ private synchronized void saveCurrentTokens(DataOutputStream out, String sdPath) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.DELEGATION_TOKENS, sdPath); prog.beginStep(Phase.SAVING_CHECKPOINT, step); prog.setTotal(Phase.SAVING_CHECKPOINT, step, currentTokens.size()); Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); out.writeInt(currentTokens.size()); Iterator<DelegationTokenIdentifier> iter = currentTokens.keySet() .iterator(); while (iter.hasNext()) { DelegationTokenIdentifier id = iter.next(); id.write(out); DelegationTokenInformation info = currentTokens.get(id); out.writeLong(info.getRenewDate()); counter.increment(); } prog.endStep(Phase.SAVING_CHECKPOINT, step); }
private synchronized void saveAllKeys(DataOutputStream out, String sdPath) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.DELEGATION_KEYS, sdPath); prog.beginStep(Phase.SAVING_CHECKPOINT, step); prog.setTotal(Phase.SAVING_CHECKPOINT, step, currentTokens.size()); Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); out.writeInt(allKeys.size()); Iterator<Integer> iter = allKeys.keySet().iterator(); while (iter.hasNext()) { Integer key = iter.next(); allKeys.get(key).write(out); counter.increment(); } prog.endStep(Phase.SAVING_CHECKPOINT, step); }
/** * Private helper methods to load Delegation tokens from fsimage */ private synchronized void loadCurrentTokens(DataInput in) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.DELEGATION_TOKENS); prog.beginStep(Phase.LOADING_FSIMAGE, step); int numberOfTokens = in.readInt(); prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfTokens); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); for (int i = 0; i < numberOfTokens; i++) { DelegationTokenIdentifier id = new DelegationTokenIdentifier(); id.readFields(in); long expiryTime = in.readLong(); addPersistedDelegationToken(id, expiryTime); counter.increment(); } prog.endStep(Phase.LOADING_FSIMAGE, step); }
/** * Private helper method to load delegation keys from fsimage. * @throws IOException on error */ private synchronized void loadAllKeys(DataInput in) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.DELEGATION_KEYS); prog.beginStep(Phase.LOADING_FSIMAGE, step); int numberOfKeys = in.readInt(); prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfKeys); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); for (int i = 0; i < numberOfKeys; i++) { DelegationKey value = new DelegationKey(); value.readFields(in); addKey(value); counter.increment(); } prog.endStep(Phase.LOADING_FSIMAGE, step); }
/** * Load an edit log, and apply the changes to the in-memory structure * This is where we apply edits that we've been writing to disk all * along. */ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, StartupOption startOpt, MetaRecoveryContext recovery) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = createStartupProgressStep(edits); prog.beginStep(Phase.LOADING_EDITS, step); fsNamesys.writeLock(); try { long startTime = monotonicNow(); FSImage.LOG.info("Start loading edits file " + edits.getName()); long numEdits = loadEditRecords(edits, false, expectedStartingTxId, startOpt, recovery); FSImage.LOG.info("Edits file " + edits.getName() + " of size " + edits.length() + " edits # " + numEdits + " loaded in " + (monotonicNow()-startTime)/1000 + " seconds"); return numEdits; } finally { edits.close(); fsNamesys.writeUnlock(); prog.endStep(Phase.LOADING_EDITS, step); } }
/** * Increment number of safe blocks if current block has * reached minimal replication. * @param replication current replication */ private synchronized void incrementSafeBlockCount(short replication) { if (replication == safeReplication) { this.blockSafe++; // Report startup progress only if we haven't completed startup yet. StartupProgress prog = NameNode.getStartupProgress(); if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) { if (this.awaitingReportedBlocksCounter == null) { this.awaitingReportedBlocksCounter = prog.getCounter(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS); } this.awaitingReportedBlocksCounter.increment(); } checkMode(); } }
/** * Load cache directives from the fsimage */ private void loadDirectives(DataInput in) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.CACHE_ENTRIES); prog.beginStep(Phase.LOADING_FSIMAGE, step); int numDirectives = in.readInt(); prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); for (int i = 0; i < numDirectives; i++) { CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in); // Get pool reference by looking it up in the map final String poolName = info.getPool(); CacheDirective directive = new CacheDirective(info.getId(), info.getPath().toUri().getPath(), info.getReplication(), info.getExpiration().getAbsoluteMillis()); addCacheDirective(poolName, directive); counter.increment(); } prog.endStep(Phase.LOADING_FSIMAGE, step); }
@Before public void setUp() throws Exception { startupProgress = new StartupProgress(); ServletContext context = mock(ServletContext.class); when(context.getAttribute(NameNodeHttpServer.STARTUP_PROGRESS_ATTRIBUTE_KEY)) .thenReturn(startupProgress); servlet = mock(StartupProgressServlet.class); when(servlet.getServletContext()).thenReturn(context); doCallRealMethod().when(servlet).doGet(any(HttpServletRequest.class), any(HttpServletResponse.class)); req = mock(HttpServletRequest.class); respOut = new ByteArrayOutputStream(); PrintWriter writer = new PrintWriter(respOut); resp = mock(HttpServletResponse.class); when(resp.getWriter()).thenReturn(writer); }
private void loadSecretManagerSection(InputStream in, StartupProgress prog, Step currentStep) throws IOException { SecretManagerSection s = SecretManagerSection.parseDelimitedFrom(in); int numKeys = s.getNumKeys(), numTokens = s.getNumTokens(); ArrayList<SecretManagerSection.DelegationKey> keys = Lists .newArrayListWithCapacity(numKeys); ArrayList<SecretManagerSection.PersistToken> tokens = Lists .newArrayListWithCapacity(numTokens); for (int i = 0; i < numKeys; ++i) keys.add(SecretManagerSection.DelegationKey.parseDelimitedFrom(in)); prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numTokens); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep); for (int i = 0; i < numTokens; ++i) { tokens.add(SecretManagerSection.PersistToken.parseDelimitedFrom(in)); counter.increment(); } fsn.loadSecretManagerState(s, keys, tokens); }
private void loadCacheManagerSection(InputStream in, StartupProgress prog, Step currentStep) throws IOException { CacheManagerSection s = CacheManagerSection.parseDelimitedFrom(in); int numPools = s.getNumPools(); ArrayList<CachePoolInfoProto> pools = Lists .newArrayListWithCapacity(numPools); ArrayList<CacheDirectiveInfoProto> directives = Lists .newArrayListWithCapacity(s.getNumDirectives()); prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numPools); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep); for (int i = 0; i < numPools; ++i) { pools.add(CachePoolInfoProto.parseDelimitedFrom(in)); counter.increment(); } for (int i = 0; i < s.getNumDirectives(); ++i) directives.add(CacheDirectiveInfoProto.parseDelimitedFrom(in)); fsn.getCacheManager().loadState( new CacheManager.PersistState(s, pools, directives)); }
void loadINodeSection(InputStream in, StartupProgress prog, Step currentStep) throws IOException { INodeSection s = INodeSection.parseDelimitedFrom(in); fsn.dir.resetLastInodeId(s.getLastInodeId()); long numInodes = s.getNumInodes(); LOG.info("Loading " + numInodes + " INodes."); prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep); for (int i = 0; i < numInodes; ++i) { INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in); if (p.getId() == INodeId.ROOT_INODE_ID) { loadRootINode(p); } else { INode n = loadINode(p); dir.addToInodeMap(n); } counter.increment(); } }
/** * Load an edit log, and apply the changes to the in-memory structure * This is where we apply edits that we've been writing to disk all * along. */ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, StartupOption startOpt, MetaRecoveryContext recovery) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = createStartupProgressStep(edits); prog.beginStep(Phase.LOADING_EDITS, step); fsNamesys.writeLock(); try { long startTime = now(); FSImage.LOG.info("Start loading edits file " + edits.getName()); long numEdits = loadEditRecords(edits, false, expectedStartingTxId, startOpt, recovery); FSImage.LOG.info("Edits file " + edits.getName() + " of size " + edits.length() + " edits # " + numEdits + " loaded in " + (now()-startTime)/1000 + " seconds"); return numEdits; } finally { edits.close(); fsNamesys.writeUnlock(); prog.endStep(Phase.LOADING_EDITS, step); } }
@Test public void testGenerateStartupProgress() throws Exception { cluster.waitClusterUp(); NamenodeJspHelper.HealthJsp jsp = new NamenodeJspHelper.HealthJsp(); StartupProgress prog = NameNode.getStartupProgress(); JspWriter out = mock(JspWriter.class); jsp.generateStartupProgress(out, prog); ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class); verify(out, atLeastOnce()).println(captor.capture()); List<String> contents = captor.getAllValues(); // Verify 100% overall completion and all phases mentioned in output. Assert.assertTrue(containsMatch(contents, "Elapsed Time\\:")); Assert.assertTrue(containsMatch(contents, "Percent Complete\\:.*?100\\.00%")); Assert.assertTrue(containsMatch(contents, LOADING_FSIMAGE.getDescription())); Assert.assertTrue(containsMatch(contents, LOADING_EDITS.getDescription())); Assert.assertTrue(containsMatch(contents, SAVING_CHECKPOINT.getDescription())); Assert.assertTrue(containsMatch(contents, SAFEMODE.getDescription())); }
/** * Private helper method to load delegation keys from fsimage. * @param in * @throws IOException */ private synchronized void loadAllKeys(DataInput in) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.DELEGATION_KEYS); prog.beginStep(Phase.LOADING_FSIMAGE, step); int numberOfKeys = in.readInt(); prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfKeys); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); for (int i = 0; i < numberOfKeys; i++) { DelegationKey value = new DelegationKey(); value.readFields(in); addKey(value); counter.increment(); } prog.endStep(Phase.LOADING_FSIMAGE, step); }
/** * Load an edit log, and apply the changes to the in-memory structure * This is where we apply edits that we've been writing to disk all * along. */ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, MetaRecoveryContext recovery) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = createStartupProgressStep(edits); prog.beginStep(Phase.LOADING_EDITS, step); fsNamesys.writeLock(); try { long startTime = now(); FSImage.LOG.info("Start loading edits file " + edits.getName()); long numEdits = loadEditRecords(edits, false, expectedStartingTxId, recovery); FSImage.LOG.info("Edits file " + edits.getName() + " of size " + edits.length() + " edits # " + numEdits + " loaded in " + (now()-startTime)/1000 + " seconds"); return numEdits; } finally { edits.close(); fsNamesys.writeUnlock(); prog.endStep(Phase.LOADING_EDITS, step); } }