JournalSet(Configuration conf, FSImage image, NNStorage storage, int numJournals, NameNodeMetrics metrics) { minimumNumberOfJournals = conf.getInt("dfs.name.edits.dir.minimum", 1); minimumNumberOfNonLocalJournals = conf.getInt("dfs.name.edits.dir.minimum.nonlocal", 0); this.image = image; this.storage = storage; ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("JournalSet Worker %d") .build(); this.executor = Executors.newFixedThreadPool(numJournals, namedThreadFactory); this.metrics = metrics; }
/** * Construct a custom journal manager. * The class to construct is taken from the configuration. * @param uri Uri to construct * @return The constructed journal manager * @throws IllegalArgumentException if no class is configured for uri */ public static JournalManager createJournal(Configuration conf, URI uri, NamespaceInfo nsInfo, NameNodeMetrics metrics) { Class<? extends JournalManager> clazz = getJournalClass(conf, uri.getScheme()); try { Constructor<? extends JournalManager> cons = clazz.getConstructor( Configuration.class, URI.class, NamespaceInfo.class, NameNodeMetrics.class); return cons.newInstance(conf, uri, nsInfo, metrics); } catch (Exception e) { throw new IllegalArgumentException("Unable to construct journal, " + uri, e); } }
/** * Instantiates an FSNamesystem loaded from the image and edits * directories specified in the passed Configuration. * * @param conf the Configuration which specifies the storage directories * from which to load * @return an FSNamesystem which contains the loaded namespace * @throws IOException if loading fails */ public static FSNamesystem loadFromDisk(Configuration conf) throws IOException { checkConfiguration(conf); FSImage fsImage = new FSImage(conf, FSNamesystem.getNamespaceDirs(conf), FSNamesystem.getNamespaceEditsDirs(conf)); FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false); StartupOption startOpt = NameNode.getStartupOption(conf); if (startOpt == StartupOption.RECOVER) { namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); } long loadStart = now(); String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf); namesystem.loadFSImage(startOpt, fsImage, HAUtil.isHAEnabled(conf, nameserviceId)); long timeTakenToLoadFSImage = now() - loadStart; LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs"); NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics(); if (nnMetrics != null) { nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage); } return namesystem; }
/** * Instantiates an FSNamesystem loaded from the image and edits * directories specified in the passed Configuration. * * @param conf * the Configuration which specifies the storage directories * from which to load * @return an FSNamesystem which contains the loaded namespace * @throws IOException * if loading fails */ public static FSNamesystem loadFromDisk(Configuration conf, NameNode namenode) throws IOException { FSNamesystem namesystem = new FSNamesystem(conf, namenode); StartupOption startOpt = NameNode.getStartupOption(conf); if (startOpt == StartupOption.RECOVER) { namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); } long loadStart = now(); namesystem.dir .imageLoadComplete(); //HOP: this function was called inside the namesystem.loadFSImage(...) which is commented out long timeTakenToLoadFSImage = now() - loadStart; LOG.info( "Finished loading FSImage in " + timeTakenToLoadFSImage + " ms"); NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics(); if (nnMetrics != null) { nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage); } return namesystem; }
/** * Instantiates an FSNamesystem loaded from the image and edits * directories specified in the passed Configuration. * * @param conf the Configuration which specifies the storage directories * from which to load * @return an FSNamesystem which contains the loaded namespace * @throws IOException if loading fails */ static FSNamesystem loadFromDisk(Configuration conf) throws IOException { checkConfiguration(conf); FSImage fsImage = new FSImage(conf, FSNamesystem.getNamespaceDirs(conf), FSNamesystem.getNamespaceEditsDirs(conf)); FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false); StartupOption startOpt = NameNode.getStartupOption(conf); if (startOpt == StartupOption.RECOVER) { namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); } long loadStart = monotonicNow(); try { namesystem.loadFSImage(startOpt); } catch (IOException ioe) { LOG.warn("Encountered exception loading fsimage", ioe); fsImage.close(); throw ioe; } long timeTakenToLoadFSImage = monotonicNow() - loadStart; LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs"); NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics(); if (nnMetrics != null) { nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage); } return namesystem; }
public final void processCacheReport(final DatanodeID datanodeID, final List<Long> blockIds) throws IOException { namesystem.writeLock(); final long startTime = Time.monotonicNow(); final long endTime; try { final DatanodeDescriptor datanode = blockManager.getDatanodeManager().getDatanode(datanodeID); if (datanode == null || !datanode.isAlive) { throw new IOException( "processCacheReport from dead or unregistered datanode: " + datanode); } processCacheReportImpl(datanode, blockIds); } finally { endTime = Time.monotonicNow(); namesystem.writeUnlock(); } // Log the block report processing stats from Namenode perspective final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); if (metrics != null) { metrics.addCacheBlockReport((int) (endTime - startTime)); } LOG.debug("Processed cache report from {}, blocks: {}, " + "processing time: {} msecs", datanodeID, blockIds.size(), (endTime - startTime)); }
/** * Regression test for HDFS-1112/HDFS-3020. Ensures that, even if * logSync isn't called periodically, the edit log will sync itself. */ @Test public void testAutoSync() throws Exception { File logDir = new File(TEST_DIR, "testAutoSync"); logDir.mkdirs(); FSEditLog log = FSImageTestUtil.createStandaloneEditLog(logDir); String oneKB = StringUtils.byteToHexString( new byte[500]); try { log.openForWrite(); NameNodeMetrics mockMetrics = Mockito.mock(NameNodeMetrics.class); log.setMetricsForTests(mockMetrics); for (int i = 0; i < 400; i++) { log.logDelete(oneKB, 1L, false); } // After ~400KB, we're still within the 512KB buffer size Mockito.verify(mockMetrics, Mockito.times(0)).addSync(Mockito.anyLong()); // After ~400KB more, we should have done an automatic sync for (int i = 0; i < 400; i++) { log.logDelete(oneKB, 1L, false); } Mockito.verify(mockMetrics, Mockito.times(1)).addSync(Mockito.anyLong()); } finally { log.close(); } }
/** * Instantiates an FSNamesystem loaded from the image and edits * directories specified in the passed Configuration. * * @param conf the Configuration which specifies the storage directories * from which to load * @return an FSNamesystem which contains the loaded namespace * @throws IOException if loading fails */ static FSNamesystem loadFromDisk(Configuration conf) throws IOException { checkConfiguration(conf); FSImage fsImage = new FSImage(conf, FSNamesystem.getNamespaceDirs(conf), FSNamesystem.getNamespaceEditsDirs(conf)); FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false); StartupOption startOpt = NameNode.getStartupOption(conf); if (startOpt == StartupOption.RECOVER) { namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); } long loadStart = monotonicNow(); try { namesystem.loadFSImage(startOpt); } catch (IOException ioe) { LOG.warn("Encountered exception loading fsimage", ioe); fsImage.close(); throw ioe; } long timeTakenToLoadFSImage = monotonicNow() - loadStart; LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs"); NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics(); if (nnMetrics != null) { nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage); } namesystem.getFSDirectory().createReservedStatuses(namesystem.getCTime()); return namesystem; }
public final void processCacheReport(final DatanodeID datanodeID, final List<Long> blockIds) throws IOException { namesystem.writeLock(); final long startTime = Time.monotonicNow(); final long endTime; try { final DatanodeDescriptor datanode = blockManager.getDatanodeManager().getDatanode(datanodeID); if (datanode == null || !datanode.isRegistered()) { throw new IOException( "processCacheReport from dead or unregistered datanode: " + datanode); } processCacheReportImpl(datanode, blockIds); } finally { endTime = Time.monotonicNow(); namesystem.writeUnlock(); } // Log the block report processing stats from Namenode perspective final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); if (metrics != null) { metrics.addCacheBlockReport((int) (endTime - startTime)); } LOG.debug("Processed cache report from {}, blocks: {}, " + "processing time: {} msecs", datanodeID, blockIds.size(), (endTime - startTime)); }
private void processQueue() { while (namesystem.isRunning()) { NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); try { Runnable action = queue.take(); // batch as many operations in the write lock until the queue // runs dry, or the max lock hold is reached. int processed = 0; namesystem.writeLock(); metrics.setBlockOpsQueued(queue.size() + 1); try { long start = Time.monotonicNow(); do { processed++; action.run(); if (Time.monotonicNow() - start > MAX_LOCK_HOLD_MS) { break; } action = queue.poll(); } while (action != null); } finally { namesystem.writeUnlock(); metrics.addBlockOpsBatched(processed - 1); } } catch (InterruptedException e) { // ignore unless thread was specifically interrupted. if (Thread.interrupted()) { break; } } } queue.clear(); }
/** * Regression test for HDFS-1112/HDFS-3020. Ensures that, even if * logSync isn't called periodically, the edit log will sync itself. */ @Test public void testAutoSync() throws Exception { File logDir = new File(TEST_DIR, "testAutoSync"); logDir.mkdirs(); FSEditLog log = FSImageTestUtil.createStandaloneEditLog(logDir); String oneKB = StringUtils.byteToHexString( new byte[500]); try { log.openForWrite(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); NameNodeMetrics mockMetrics = Mockito.mock(NameNodeMetrics.class); log.setMetricsForTests(mockMetrics); for (int i = 0; i < 400; i++) { log.logDelete(oneKB, 1L, false); } // After ~400KB, we're still within the 512KB buffer size Mockito.verify(mockMetrics, Mockito.times(0)).addSync(Mockito.anyLong()); // After ~400KB more, we should have done an automatic sync for (int i = 0; i < 400; i++) { log.logDelete(oneKB, 1L, false); } Mockito.verify(mockMetrics, Mockito.times(1)).addSync(Mockito.anyLong()); } finally { log.close(); } }
/** * Instantiates an FSNamesystem loaded from the image and edits * directories specified in the passed Configuration. * * @param conf the Configuration which specifies the storage directories * from which to load * @return an FSNamesystem which contains the loaded namespace * @throws IOException if loading fails */ static FSNamesystem loadFromDisk(Configuration conf) throws IOException { checkConfiguration(conf); FSImage fsImage = new FSImage(conf, FSNamesystem.getNamespaceDirs(conf), FSNamesystem.getNamespaceEditsDirs(conf)); FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false); StartupOption startOpt = NameNode.getStartupOption(conf); if (startOpt == StartupOption.RECOVER) { namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); } long loadStart = now(); try { namesystem.loadFSImage(startOpt); } catch (IOException ioe) { LOG.warn("Encountered exception loading fsimage", ioe); fsImage.close(); throw ioe; } long timeTakenToLoadFSImage = now() - loadStart; LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs"); NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics(); if (nnMetrics != null) { nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage); } return namesystem; }
public ImageSet(FSImage fsImage, Collection<URI> fsDirs, Collection<URI> fsEditsDirs, NameNodeMetrics metrics) throws IOException { this.imageManagers = new ArrayList<ImageManager>(); this.metrics = metrics; // get all IMAGE directories Iterator<StorageDirectory> it = fsImage.storage .dirIterator(NameNodeDirType.IMAGE); while (it.hasNext()) { StorageDirectory sd = it.next(); validate(sd.getRoot(), fsDirs); imageManagers.add(new FileImageManager(sd, fsImage.storage)); } // add all journal managers that store images List<JournalManager> nonFileJournalManagers = fsImage.editLog.getNonFileJournalManagers(); for (JournalManager jm : nonFileJournalManagers) { if (jm instanceof ImageManager && jm.hasImageStorage()) { ImageManager im = (ImageManager) jm; validate(im.getURI(), fsDirs); imageManagers.add(im); } } // initialize metrics updateImageMetrics(); }
/** * Initialize name-node. * */ protected void initialize() throws IOException { // set service-level authorization security policy if (serviceAuthEnabled = getConf().getBoolean( ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) { PolicyProvider policyProvider = (PolicyProvider)(ReflectionUtils.newInstance( getConf().getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, HDFSPolicyProvider.class, PolicyProvider.class), getConf())); SecurityUtil.setPolicy(new ConfiguredPolicy(getConf(), policyProvider)); } // This is a check that the port is free // create a socket and bind to it, throw exception if port is busy // This has to be done before we are reading Namesystem not to waste time and fail fast NetUtils.isSocketBindable(getClientProtocolAddress(getConf())); NetUtils.isSocketBindable(getDNProtocolAddress(getConf())); NetUtils.isSocketBindable(getHttpServerAddress(getConf())); long serverVersion = ClientProtocol.versionID; this.clientProtocolMethodsFingerprint = ProtocolSignature .getMethodsSigFingerPrint(ClientProtocol.class, serverVersion); myMetrics = new NameNodeMetrics(getConf(), this); this.clusterName = getConf().get(FSConstants.DFS_CLUSTER_NAME); this.namesystem = new FSNamesystem(this, getConf()); // HACK: from removal of FSNamesystem.getFSNamesystem(). JspHelper.fsn = this.namesystem; this.startDNServer(); startHttpServer(getConf()); }
public final void processCacheReport(final DatanodeID datanodeID, final List<Long> blockIds) throws IOException { namesystem.writeLock(); final long startTime = Time.monotonicNow(); final long endTime; try { final DatanodeDescriptor datanode = blockManager.getDatanodeManager().getDatanode(datanodeID); if (datanode == null || !datanode.isAlive) { throw new IOException( "processCacheReport from dead or unregistered datanode: " + datanode); } processCacheReportImpl(datanode, blockIds); } finally { endTime = Time.monotonicNow(); namesystem.writeUnlock(); } // Log the block report processing stats from Namenode perspective final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); if (metrics != null) { metrics.addCacheBlockReport((int) (endTime - startTime)); } if (LOG.isDebugEnabled()) { LOG.debug("Processed cache report from " + datanodeID + ", blocks: " + blockIds.size() + ", processing time: " + (endTime - startTime) + " msecs"); } }
/** * Initialize name-node. * * @param conf the configuration */ private void initialize(Configuration conf) throws IOException { InetSocketAddress socAddr = NameNode.getAddress(conf); int handlerCount = conf.getInt("dfs.namenode.handler.count", 10); // set service-level authorization security policy if (serviceAuthEnabled = conf.getBoolean( ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) { PolicyProvider policyProvider = (PolicyProvider)(ReflectionUtils.newInstance( conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, HDFSPolicyProvider.class, PolicyProvider.class), conf)); SecurityUtil.setPolicy(new ConfiguredPolicy(conf, policyProvider)); } // create rpc server this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(), handlerCount, false, conf); // The rpc-server port can be ephemeral... ensure we have the correct info this.serverAddress = this.server.getListenerAddress(); FileSystem.setDefaultUri(conf, getUri(serverAddress)); LOG.info("Namenode up at: " + this.serverAddress); myMetrics = new NameNodeMetrics(conf, this); this.namesystem = new FSNamesystem(this, conf); startHttpServer(conf); this.server.start(); //start RPC server startTrashEmptier(conf); }
static void initMetrics(Configuration conf, NamenodeRole role) { metrics = NameNodeMetrics.create(conf, role); }
public static NameNodeMetrics getNameNodeMetrics() { return metrics; }
@Override protected void doPut(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { try { ServletContext context = getServletContext(); final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context); final Configuration conf = (Configuration) getServletContext() .getAttribute(JspHelper.CURRENT_CONF); final PutImageParams parsedParams = new PutImageParams(request, response, conf); final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); validateRequest(context, conf, request, response, nnImage, parsedParams.getStorageInfoString()); UserGroupInformation.getCurrentUser().doAs( new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { final long txid = parsedParams.getTxId(); final NameNodeFile nnf = parsedParams.getNameNodeFile(); if (!nnImage.addToCheckpointing(txid)) { response.sendError(HttpServletResponse.SC_CONFLICT, "Either current namenode is checkpointing or another" + " checkpointer is already in the process of " + "uploading a checkpoint made at transaction ID " + txid); return null; } try { if (nnImage.getStorage().findImageFile(nnf, txid) != null) { response.sendError(HttpServletResponse.SC_CONFLICT, "Either current namenode has checkpointed or " + "another checkpointer already uploaded an " + "checkpoint for txid " + txid); return null; } InputStream stream = request.getInputStream(); try { long start = monotonicNow(); MD5Hash downloadImageDigest = TransferFsImage .handleUploadImageRequest(request, txid, nnImage.getStorage(), stream, parsedParams.getFileSize(), getThrottler(conf)); nnImage.saveDigestAndRenameCheckpointImage(nnf, txid, downloadImageDigest); // Metrics non-null only when used inside name node if (metrics != null) { long elapsed = monotonicNow() - start; metrics.addPutImage(elapsed); } // Now that we have a new checkpoint, we might be able to // remove some old ones. nnImage.purgeOldStorage(nnf); } finally { stream.close(); } } finally { nnImage.removeFromCheckpointing(txid); } return null; } }); } catch (Throwable t) { String errMsg = "PutImage failed. " + StringUtils.stringifyException(t); response.sendError(HttpServletResponse.SC_GONE, errMsg); throw new IOException(errMsg); } }
/** * Used only by tests. */ @VisibleForTesting void setMetricsForTests(NameNodeMetrics metrics) { this.metrics = metrics; }
public static void initMetrics(Configuration conf, NamenodeRole role) { metrics = NameNodeMetrics.create(conf, role); }