/** * Private helper methods to save delegation keys and tokens in fsimage */ private synchronized void saveCurrentTokens(DataOutputStream out, String sdPath) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.DELEGATION_TOKENS, sdPath); prog.beginStep(Phase.SAVING_CHECKPOINT, step); prog.setTotal(Phase.SAVING_CHECKPOINT, step, currentTokens.size()); Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); out.writeInt(currentTokens.size()); Iterator<DelegationTokenIdentifier> iter = currentTokens.keySet() .iterator(); while (iter.hasNext()) { DelegationTokenIdentifier id = iter.next(); id.write(out); DelegationTokenInformation info = currentTokens.get(id); out.writeLong(info.getRenewDate()); counter.increment(); } prog.endStep(Phase.SAVING_CHECKPOINT, step); }
private synchronized void saveAllKeys(DataOutputStream out, String sdPath) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.DELEGATION_KEYS, sdPath); prog.beginStep(Phase.SAVING_CHECKPOINT, step); prog.setTotal(Phase.SAVING_CHECKPOINT, step, currentTokens.size()); Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); out.writeInt(allKeys.size()); Iterator<Integer> iter = allKeys.keySet().iterator(); while (iter.hasNext()) { Integer key = iter.next(); allKeys.get(key).write(out); counter.increment(); } prog.endStep(Phase.SAVING_CHECKPOINT, step); }
/** * Private helper methods to load Delegation tokens from fsimage */ private synchronized void loadCurrentTokens(DataInput in) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.DELEGATION_TOKENS); prog.beginStep(Phase.LOADING_FSIMAGE, step); int numberOfTokens = in.readInt(); prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfTokens); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); for (int i = 0; i < numberOfTokens; i++) { DelegationTokenIdentifier id = new DelegationTokenIdentifier(); id.readFields(in); long expiryTime = in.readLong(); addPersistedDelegationToken(id, expiryTime); counter.increment(); } prog.endStep(Phase.LOADING_FSIMAGE, step); }
/** * Private helper method to load delegation keys from fsimage. * @throws IOException on error */ private synchronized void loadAllKeys(DataInput in) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.DELEGATION_KEYS); prog.beginStep(Phase.LOADING_FSIMAGE, step); int numberOfKeys = in.readInt(); prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfKeys); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); for (int i = 0; i < numberOfKeys; i++) { DelegationKey value = new DelegationKey(); value.readFields(in); addKey(value); counter.increment(); } prog.endStep(Phase.LOADING_FSIMAGE, step); }
/** * Load an edit log, and apply the changes to the in-memory structure * This is where we apply edits that we've been writing to disk all * along. */ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, StartupOption startOpt, MetaRecoveryContext recovery) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = createStartupProgressStep(edits); prog.beginStep(Phase.LOADING_EDITS, step); fsNamesys.writeLock(); try { long startTime = monotonicNow(); FSImage.LOG.info("Start loading edits file " + edits.getName()); long numEdits = loadEditRecords(edits, false, expectedStartingTxId, startOpt, recovery); FSImage.LOG.info("Edits file " + edits.getName() + " of size " + edits.length() + " edits # " + numEdits + " loaded in " + (monotonicNow()-startTime)/1000 + " seconds"); return numEdits; } finally { edits.close(); fsNamesys.writeUnlock(); prog.endStep(Phase.LOADING_EDITS, step); } }
/** * Increment number of safe blocks if current block has * reached minimal replication. * @param replication current replication */ private synchronized void incrementSafeBlockCount(short replication) { if (replication == safeReplication) { this.blockSafe++; // Report startup progress only if we haven't completed startup yet. StartupProgress prog = NameNode.getStartupProgress(); if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) { if (this.awaitingReportedBlocksCounter == null) { this.awaitingReportedBlocksCounter = prog.getCounter(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS); } this.awaitingReportedBlocksCounter.increment(); } checkMode(); } }
/** * Load cache directives from the fsimage */ private void loadDirectives(DataInput in) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.CACHE_ENTRIES); prog.beginStep(Phase.LOADING_FSIMAGE, step); int numDirectives = in.readInt(); prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); for (int i = 0; i < numDirectives; i++) { CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in); // Get pool reference by looking it up in the map final String poolName = info.getPool(); CacheDirective directive = new CacheDirective(info.getId(), info.getPath().toUri().getPath(), info.getReplication(), info.getExpiration().getAbsoluteMillis()); addCacheDirective(poolName, directive); counter.increment(); } prog.endStep(Phase.LOADING_FSIMAGE, step); }
@Override public void getMetrics(MetricsCollector collector, boolean all) { StartupProgressView prog = startupProgress.createView(); MetricsRecordBuilder builder = collector.addRecord( STARTUP_PROGRESS_METRICS_INFO); builder.addCounter(info("ElapsedTime", "overall elapsed time"), prog.getElapsedTime()); builder.addGauge(info("PercentComplete", "overall percent complete"), prog.getPercentComplete()); for (Phase phase: prog.getPhases()) { addCounter(builder, phase, "Count", " count", prog.getCount(phase)); addCounter(builder, phase, "ElapsedTime", " elapsed time", prog.getElapsedTime(phase)); addCounter(builder, phase, "Total", " total", prog.getTotal(phase)); addGauge(builder, phase, "PercentComplete", " percent complete", prog.getPercentComplete(phase)); } }
private void loadSecretManagerSection(InputStream in, StartupProgress prog, Step currentStep) throws IOException { SecretManagerSection s = SecretManagerSection.parseDelimitedFrom(in); int numKeys = s.getNumKeys(), numTokens = s.getNumTokens(); ArrayList<SecretManagerSection.DelegationKey> keys = Lists .newArrayListWithCapacity(numKeys); ArrayList<SecretManagerSection.PersistToken> tokens = Lists .newArrayListWithCapacity(numTokens); for (int i = 0; i < numKeys; ++i) keys.add(SecretManagerSection.DelegationKey.parseDelimitedFrom(in)); prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numTokens); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep); for (int i = 0; i < numTokens; ++i) { tokens.add(SecretManagerSection.PersistToken.parseDelimitedFrom(in)); counter.increment(); } fsn.loadSecretManagerState(s, keys, tokens); }
private void loadCacheManagerSection(InputStream in, StartupProgress prog, Step currentStep) throws IOException { CacheManagerSection s = CacheManagerSection.parseDelimitedFrom(in); int numPools = s.getNumPools(); ArrayList<CachePoolInfoProto> pools = Lists .newArrayListWithCapacity(numPools); ArrayList<CacheDirectiveInfoProto> directives = Lists .newArrayListWithCapacity(s.getNumDirectives()); prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numPools); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep); for (int i = 0; i < numPools; ++i) { pools.add(CachePoolInfoProto.parseDelimitedFrom(in)); counter.increment(); } for (int i = 0; i < s.getNumDirectives(); ++i) directives.add(CacheDirectiveInfoProto.parseDelimitedFrom(in)); fsn.getCacheManager().loadState( new CacheManager.PersistState(s, pools, directives)); }
void loadINodeSection(InputStream in, StartupProgress prog, Step currentStep) throws IOException { INodeSection s = INodeSection.parseDelimitedFrom(in); fsn.dir.resetLastInodeId(s.getLastInodeId()); long numInodes = s.getNumInodes(); LOG.info("Loading " + numInodes + " INodes."); prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep); for (int i = 0; i < numInodes; ++i) { INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in); if (p.getId() == INodeId.ROOT_INODE_ID) { loadRootINode(p); } else { INode n = loadINode(p); dir.addToInodeMap(n); } counter.increment(); } }
/** * Load an edit log, and apply the changes to the in-memory structure * This is where we apply edits that we've been writing to disk all * along. */ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, StartupOption startOpt, MetaRecoveryContext recovery) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = createStartupProgressStep(edits); prog.beginStep(Phase.LOADING_EDITS, step); fsNamesys.writeLock(); try { long startTime = now(); FSImage.LOG.info("Start loading edits file " + edits.getName()); long numEdits = loadEditRecords(edits, false, expectedStartingTxId, startOpt, recovery); FSImage.LOG.info("Edits file " + edits.getName() + " of size " + edits.length() + " edits # " + numEdits + " loaded in " + (now()-startTime)/1000 + " seconds"); return numEdits; } finally { edits.close(); fsNamesys.writeUnlock(); prog.endStep(Phase.LOADING_EDITS, step); } }
/** * Prints one line of content for a phase in the Startup Progress report. * * @param fout FormattedWriter to receive output * @param view StartupProgressView containing information to print * @param phase Phase to print * @throws IOException thrown if there is an I/O error */ private void printPhase(FormattedWriter fout, StartupProgressView view, Phase phase) throws IOException { StringBuilder phaseLine = new StringBuilder(); phaseLine.append(phase.getDescription()); String file = view.getFile(phase); if (file != null) { phaseLine.append(" ").append(file); } long size = view.getSize(phase); if (size != Long.MIN_VALUE) { phaseLine.append(" (").append(StringUtils.byteDesc(size)).append(")"); } fout.println("<td class=\"startupdesc\">%s</td>", phaseLine.toString()); fout.println("<td>%s</td>", StringUtils.formatPercent( view.getPercentComplete(phase), 2)); fout.println("<td>%s</td>", view.getStatus(phase) == Status.PENDING ? "" : StringUtils.formatTime(view.getElapsedTime(phase))); }
/** * Prints one line of content for a step in the Startup Progress report. * * @param fout FormattedWriter to receive output * @param view StartupProgressView containing information to print * @param phase Phase to print * @param step Step to print * @throws IOException thrown if there is an I/O error */ private void printStep(FormattedWriter fout, StartupProgressView view, Phase phase, Step step) throws IOException { StringBuilder stepLine = new StringBuilder(); String file = step.getFile(); if (file != null) { stepLine.append(file); } long size = step.getSize(); if (size != Long.MIN_VALUE) { stepLine.append(" (").append(StringUtils.byteDesc(size)).append(")"); } StepType type = step.getType(); if (type != null) { stepLine.append(" ").append(type.getDescription()); } fout.println("<td class=\"startupdesc\">%s (%d/%d)</td>", stepLine.toString(), view.getCount(phase, step), view.getTotal(phase, step)); fout.println("<td>%s</td>", StringUtils.formatPercent( view.getPercentComplete(phase), 2)); fout.println("<td>%s</td>", view.getStatus(phase) == Status.PENDING ? "" : StringUtils.formatTime(view.getElapsedTime(phase))); }
/** * Private helper method to load delegation keys from fsimage. * @param in * @throws IOException */ private synchronized void loadAllKeys(DataInput in) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.DELEGATION_KEYS); prog.beginStep(Phase.LOADING_FSIMAGE, step); int numberOfKeys = in.readInt(); prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfKeys); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); for (int i = 0; i < numberOfKeys; i++) { DelegationKey value = new DelegationKey(); value.readFields(in); addKey(value); counter.increment(); } prog.endStep(Phase.LOADING_FSIMAGE, step); }