private void verifyStats(NameNode namenode, FSNamesystem fsn, DatanodeInfo info, DataNode node, boolean decommissioning) throws InterruptedException, IOException { // Do the stats check over 10 heartbeats for (int i = 0; i < 10; i++) { long[] newStats = namenode.getRpcServer().getStats(); // For decommissioning nodes, ensure capacity of the DN is no longer // counted. Only used space of the DN is counted in cluster capacity assertEquals(newStats[0], decommissioning ? info.getDfsUsed() : info.getCapacity()); // Ensure cluster used capacity is counted for both normal and // decommissioning nodes assertEquals(newStats[1], info.getDfsUsed()); // For decommissioning nodes, remaining space from the DN is not counted assertEquals(newStats[2], decommissioning ? 0 : info.getRemaining()); // Ensure transceiver count is same as that DN assertEquals(fsn.getTotalLoad(), info.getXceiverCount()); DataNodeTestUtils.triggerHeartbeat(node); } }
private WebHdfsFileSystem getWebHdfsFileSystem(UserGroupInformation ugi, Configuration conf) throws IOException { if (UserGroupInformation.isSecurityEnabled()) { DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(new Text( ugi.getUserName()), null, null); FSNamesystem namesystem = mock(FSNamesystem.class); DelegationTokenSecretManager dtSecretManager = new DelegationTokenSecretManager( 86400000, 86400000, 86400000, 86400000, namesystem); dtSecretManager.startThreads(); Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>( dtId, dtSecretManager); SecurityUtil.setTokenService( token, NetUtils.createSocketAddr(uri.getAuthority())); token.setKind(WebHdfsFileSystem.TOKEN_KIND); ugi.addToken(token); } return (WebHdfsFileSystem) FileSystem.get(uri, conf); }
/** * Test that the shared edits dir is automatically added to the list of edits * dirs that are marked required. */ @Test public void testSharedDirIsAutomaticallyMarkedRequired() throws URISyntaxException { URI foo = new URI("file:/foo"); URI bar = new URI("file:/bar"); Configuration conf = new Configuration(); conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, Joiner.on(",").join(foo, bar)); conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY, foo.toString()); assertFalse(FSNamesystem.getRequiredNamespaceEditsDirs(conf).contains( bar)); conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, bar.toString()); Collection<URI> requiredEditsDirs = FSNamesystem .getRequiredNamespaceEditsDirs(conf); assertTrue(Joiner.on(",").join(requiredEditsDirs) + " does not contain " + bar, requiredEditsDirs.contains(bar)); }
/** * Multiple shared edits directories is an invalid configuration. */ @Test public void testMultipleSharedDirsFails() throws Exception { Configuration conf = new Configuration(); URI sharedA = new URI("file:///shared-A"); URI sharedB = new URI("file:///shared-B"); URI localA = new URI("file:///local-A"); conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, Joiner.on(",").join(sharedA,sharedB)); conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, localA.toString()); try { FSNamesystem.getNamespaceEditsDirs(conf); fail("Allowed multiple shared edits directories"); } catch (IOException ioe) { assertEquals("Multiple shared edits directories are not yet supported", ioe.getMessage()); } }
/** * Make sure that the shared edits dirs are listed before non-shared dirs * when the configuration is parsed. This ensures that the shared journals * are synced before the local ones. */ @Test public void testSharedDirsComeFirstInEditsList() throws Exception { Configuration conf = new Configuration(); URI sharedA = new URI("file:///shared-A"); URI localA = new URI("file:///local-A"); URI localB = new URI("file:///local-B"); URI localC = new URI("file:///local-C"); conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedA.toString()); // List them in reverse order, to make sure they show up in // the order listed, regardless of lexical sort order. conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, Joiner.on(",").join(localC, localB, localA)); List<URI> dirs = FSNamesystem.getNamespaceEditsDirs(conf); assertEquals( "Shared dirs should come first, then local dirs, in the order " + "they were listed in the configuration.", Joiner.on(",").join(sharedA, localC, localB, localA), Joiner.on(",").join(dirs)); }
/** * Test case for enter safemode in active namenode, when it is already in startup safemode. * It is a regression test for HDFS-2747. */ @Test public void testEnterSafeModeInANNShouldNotThrowNPE() throws Exception { banner("Restarting active"); DFSTestUtil .createFile(fs, new Path("/test"), 3 * BLOCK_SIZE, (short) 3, 1L); restartActive(); nn0.getRpcServer().transitionToActive( new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER)); FSNamesystem namesystem = nn0.getNamesystem(); String status = namesystem.getSafemode(); assertTrue("Bad safemode status: '" + status + "'", status .startsWith("Safe mode is ON.")); NameNodeAdapter.enterSafeMode(nn0, false); assertTrue("Failed to enter into safemode in active", namesystem .isInSafeMode()); NameNodeAdapter.enterSafeMode(nn0, false); assertTrue("Failed to enter into safemode in active", namesystem .isInSafeMode()); }
/** * Ensure that the given NameNode marks the specified DataNode as * entirely dead/expired. * @param nn the NameNode to manipulate * @param dnName the name of the DataNode */ public static void noticeDeadDatanode(NameNode nn, String dnName) { FSNamesystem namesystem = nn.getNamesystem(); namesystem.writeLock(); try { DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager(); HeartbeatManager hbm = dnm.getHeartbeatManager(); DatanodeDescriptor[] dnds = hbm.getDatanodes(); DatanodeDescriptor theDND = null; for (DatanodeDescriptor dnd : dnds) { if (dnd.getXferAddr().equals(dnName)) { theDND = dnd; } } Assert.assertNotNull("Could not find DN with name: " + dnName, theDND); synchronized (hbm) { DFSTestUtil.setDatanodeDead(theDND); hbm.heartbeatCheck(); } } finally { namesystem.writeUnlock(); } }
@Test public void testSufficientlySingleReplBlockUsesNewRack() throws Exception { Configuration conf = getConf(); short REPLICATION_FACTOR = 1; final Path filePath = new Path("/testFile"); String racks[] = {"/rack1", "/rack1", "/rack1", "/rack2"}; MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(racks.length).racks(racks).build(); final FSNamesystem ns = cluster.getNameNode().getNamesystem(); try { // Create a file with one block with a replication factor of 1 final FileSystem fs = cluster.getFileSystem(); DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L); ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath); DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0); REPLICATION_FACTOR = 2; NameNodeAdapter.setReplication(ns, "/testFile", REPLICATION_FACTOR); DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0); } finally { cluster.shutdown(); } }
/** * Test over replicated block should get invalidated when decreasing the * replication for a partial block. */ @Test public void testInvalidateOverReplicatedBlock() throws Exception { Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) .build(); try { final FSNamesystem namesystem = cluster.getNamesystem(); final BlockManager bm = namesystem.getBlockManager(); FileSystem fs = cluster.getFileSystem(); Path p = new Path(MiniDFSCluster.getBaseDirectory(), "/foo1"); FSDataOutputStream out = fs.create(p, (short) 2); out.writeBytes("HDFS-3119: " + p); out.hsync(); fs.setReplication(p, (short) 1); out.close(); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, p); assertEquals("Expected only one live replica for the block", 1, bm .countNodes(block.getLocalBlock()).liveReplicas()); } finally { cluster.shutdown(); } }
@Before public void setupMockCluster() throws IOException { Configuration conf = new HdfsConfiguration(); conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "need to set a dummy value here so it assumes a multi-rack cluster"); fsn = Mockito.mock(FSNamesystem.class); Mockito.doReturn(true).when(fsn).hasWriteLock(); bm = new BlockManager(fsn, conf); final String[] racks = { "/rackA", "/rackA", "/rackA", "/rackB", "/rackB", "/rackB"}; storages = DFSTestUtil.createDatanodeStorageInfos(racks); nodes = Arrays.asList(DFSTestUtil.toDatanodeDescriptor(storages)); rackA = nodes.subList(0, 3); rackB = nodes.subList(3, 6); }
/** Wait until the given namenode gets first block reports from all the datanodes */ public void waitFirstBRCompleted(int nnIndex, int timeout) throws IOException, TimeoutException, InterruptedException { if (namenodes.size() == 0 || getNN(nnIndex) == null || getNN(nnIndex).nameNode == null) { return; } final FSNamesystem ns = getNamesystem(nnIndex); final DatanodeManager dm = ns.getBlockManager().getDatanodeManager(); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { List<DatanodeDescriptor> nodes = dm.getDatanodeListForReport (DatanodeReportType.LIVE); for (DatanodeDescriptor node : nodes) { if (!node.checkBlockReportReceived()) { return false; } } return true; } }, 100, timeout); }
/** * Set up the mock context. * * - extension is always needed (default period is {@link #EXTENSION} ms * - datanode threshold is always reached via mock * - safe block is 0 and it needs {@link #BLOCK_THRESHOLD} to reach threshold * - write/read lock is always held by current thread * * @throws IOException */ @Before public void setupMockCluster() throws IOException { Configuration conf = new HdfsConfiguration(); conf.setDouble(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, THRESHOLD); conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, EXTENSION); conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, DATANODE_NUM); FSNamesystem fsn = mock(FSNamesystem.class); Mockito.doReturn(true).when(fsn).hasWriteLock(); Mockito.doReturn(true).when(fsn).hasReadLock(); Mockito.doReturn(true).when(fsn).isRunning(); NameNode.initMetrics(conf, NamenodeRole.NAMENODE); bm = spy(new BlockManager(fsn, conf)); dn = spy(bm.getDatanodeManager()); Whitebox.setInternalState(bm, "datanodeManager", dn); // the datanode threshold is always met when(dn.getNumLiveDataNodes()).thenReturn(DATANODE_NUM); bmSafeMode = new BlockManagerSafeMode(bm, fsn, conf); }
public MyDelegationTokenSecretManager(long delegationKeyUpdateInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval, long delegationTokenRemoverScanInterval, FSNamesystem namesystem) { super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, delegationTokenRemoverScanInterval, namesystem); }
public DelegationTokenSecretManager(long delegationKeyUpdateInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval, long delegationTokenRemoverScanInterval, FSNamesystem namesystem) { this(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, delegationTokenRemoverScanInterval, false, namesystem); }
public EditLogTailer(FSNamesystem namesystem, Configuration conf) { this.tailerThread = new EditLogTailerThread(); this.conf = conf; this.namesystem = namesystem; this.editLog = namesystem.getEditLog(); lastLoadTimeMs = monotonicNow(); logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_DEFAULT) * 1000; if (logRollPeriodMs >= 0) { this.activeAddr = getActiveNodeAddress(); Preconditions.checkArgument(activeAddr.getPort() > 0, "Active NameNode must have an IPC port configured. " + "Got address '%s'", activeAddr); LOG.info("Will roll logs on active node at " + activeAddr + " every " + (logRollPeriodMs / 1000) + " seconds."); } else { LOG.info("Not going to trigger log rolls on active node because " + DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY + " is negative."); } sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000; LOG.debug("logRollPeriodMs=" + logRollPeriodMs + " sleepTime=" + sleepTimeMs); }
public StandbyCheckpointer(Configuration conf, FSNamesystem ns) throws IOException { this.namesystem = ns; this.conf = conf; this.checkpointConf = new CheckpointConf(conf); this.thread = new CheckpointerThread(); this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("TransferFsImageUpload-%d").build(); setNameNodeAddresses(conf); }
private void parseConfAndFindOtherNN() throws IOException { Configuration conf = getConf(); nsId = DFSUtil.getNamenodeNameServiceId(conf); if (!HAUtil.isHAEnabled(conf, nsId)) { throw new HadoopIllegalArgumentException( "HA is not enabled for this namenode."); } nnId = HAUtil.getNameNodeId(conf, nsId); NameNode.initializeGenericKeys(conf, nsId, nnId); if (!HAUtil.usesSharedEditsDir(conf)) { throw new HadoopIllegalArgumentException( "Shared edits storage is not enabled for this namenode."); } Configuration otherNode = HAUtil.getConfForOtherNode(conf); otherNNId = HAUtil.getNameNodeId(otherNode, nsId); otherIpcAddr = NameNode.getServiceAddress(otherNode, true); Preconditions.checkArgument(otherIpcAddr.getPort() != 0 && !otherIpcAddr.getAddress().isAnyLocalAddress(), "Could not determine valid IPC address for other NameNode (%s)" + ", got: %s", otherNNId, otherIpcAddr); final String scheme = DFSUtil.getHttpClientScheme(conf); otherHttpAddr = DFSUtil.getInfoServerWithDefaultHost( otherIpcAddr.getHostName(), otherNode, scheme).toURL(); dirsToFormat = FSNamesystem.getNamespaceDirs(conf); editUrisToFormat = FSNamesystem.getNamespaceEditsDirs( conf, false); sharedEditsUris = FSNamesystem.getSharedEditsDirs(conf); }
public Saver(FSImageFormatProtobuf.Saver parent, FileSummary.Builder headers, SaveNamespaceContext context, FSNamesystem fsn) { this.parent = parent; this.headers = headers; this.context = context; this.fsn = fsn; }
public CacheReplicationMonitor(FSNamesystem namesystem, CacheManager cacheManager, long intervalMs, ReentrantLock lock) { this.namesystem = namesystem; this.blockManager = namesystem.getBlockManager(); this.cacheManager = cacheManager; this.cachedBlocks = cacheManager.getCachedBlocks(); this.intervalMs = intervalMs; this.lock = lock; this.doRescan = this.lock.newCondition(); this.scanFinished = this.lock.newCondition(); }
/** * Keep accessing the given file until the namenode reports that the * given block in the file contains the given number of corrupt replicas. */ public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns, Path file, ExtendedBlock b, int corruptRepls) throws TimeoutException, InterruptedException { int count = 0; final int ATTEMPTS = 50; int repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock()); while (repls != corruptRepls && count < ATTEMPTS) { try { IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(), 512, true); } catch (IOException e) { // Swallow exceptions } System.out.println("Waiting for "+corruptRepls+" corrupt replicas"); count++; // check more often so corrupt block reports are not easily missed for (int i = 0; i < 10; i++) { repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock()); Thread.sleep(100); if (repls == corruptRepls) { break; } } } if (count == ATTEMPTS) { throw new TimeoutException("Timed out waiting for corrupt replicas." + " Waiting for "+corruptRepls+", but only found "+repls); } }
/** * Wait for datanode to reach alive or dead state for waitTime given in * milliseconds. */ public static void waitForDatanodeState( final MiniDFSCluster cluster, final String nodeID, final boolean alive, int waitTime) throws TimeoutException, InterruptedException { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { FSNamesystem namesystem = cluster.getNamesystem(); final DatanodeDescriptor dd = BlockManagerTestUtil.getDatanode( namesystem, nodeID); return (dd.isAlive == alive); } }, 100, waitTime); }
public static void setNameNodeLogLevel(Level level) { GenericTestUtils.setLogLevel(FSNamesystem.LOG, level); GenericTestUtils.setLogLevel(BlockManager.LOG, level); GenericTestUtils.setLogLevel(LeaseManager.LOG, level); GenericTestUtils.setLogLevel(NameNode.LOG, level); GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level); GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level); }
/** * Test case for enter safemode in standby namenode, when it is already in startup safemode. * It is a regression test for HDFS-2747. */ @Test public void testEnterSafeModeInSBNShouldNotThrowNPE() throws Exception { banner("Starting with NN0 active and NN1 standby, creating some blocks"); DFSTestUtil .createFile(fs, new Path("/test"), 3 * BLOCK_SIZE, (short) 3, 1L); // Roll edit log so that, when the SBN restarts, it will load // the namespace during startup and enter safemode. nn0.getRpcServer().rollEditLog(); banner("Creating some blocks that won't be in the edit log"); DFSTestUtil.createFile(fs, new Path("/test2"), 5 * BLOCK_SIZE, (short) 3, 1L); banner("Deleting the original blocks"); fs.delete(new Path("/test"), true); banner("Restarting standby"); restartStandby(); FSNamesystem namesystem = nn1.getNamesystem(); String status = namesystem.getSafemode(); assertTrue("Bad safemode status: '" + status + "'", status .startsWith("Safe mode is ON.")); NameNodeAdapter.enterSafeMode(nn1, false); assertTrue("Failed to enter into safemode in standby", namesystem .isInSafeMode()); NameNodeAdapter.enterSafeMode(nn1, false); assertTrue("Failed to enter into safemode in standby", namesystem .isInSafeMode()); }
/** * Tests that the namenode edits dirs and shared edits dirs are gotten with * duplicates removed */ @Test public void testHAUniqueEditDirs() throws IOException { Configuration conf = new Configuration(); conf.set(DFS_NAMENODE_EDITS_DIR_KEY, "file://edits/dir, " + "file://edits/shared/dir"); // overlapping conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, "file://edits/shared/dir"); // getNamespaceEditsDirs removes duplicates across edits and shared.edits Collection<URI> editsDirs = FSNamesystem.getNamespaceEditsDirs(conf); assertEquals(2, editsDirs.size()); }
private static void lowerKeyUpdateIntervalAndClearKeys(FSNamesystem namesystem) { BlockTokenSecretManager btsm = namesystem.getBlockManager() .getBlockTokenSecretManager(); btsm.setKeyUpdateIntervalForTesting(2 * 1000); btsm.setTokenLifetime(2 * 1000); btsm.clearAllKeysForTesting(); }
/** Disable the logs that are not very useful for snapshot related tests. */ public static void disableLogs() { final String[] lognames = { "org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner", "org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl", "org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService", }; for(String n : lognames) { GenericTestUtils.disableLog(LogFactory.getLog(n)); } GenericTestUtils.disableLog(LogFactory.getLog(UserGroupInformation.class)); GenericTestUtils.disableLog(LogFactory.getLog(BlockManager.class)); GenericTestUtils.disableLog(LogFactory.getLog(FSNamesystem.class)); GenericTestUtils.disableLog(LogFactory.getLog(DirectoryScanner.class)); GenericTestUtils.disableLog(LogFactory.getLog(MetricsSystemImpl.class)); GenericTestUtils.disableLog(BlockScanner.LOG); GenericTestUtils.disableLog(HttpServer2.LOG); GenericTestUtils.disableLog(DataNode.LOG); GenericTestUtils.disableLog(BlockPoolSliceStorage.LOG); GenericTestUtils.disableLog(LeaseManager.LOG); GenericTestUtils.disableLog(NameNode.stateChangeLog); GenericTestUtils.disableLog(NameNode.blockStateChangeLog); GenericTestUtils.disableLog(DFSClient.LOG); GenericTestUtils.disableLog(Server.LOG); }
/** @return the datanode descriptor for the given the given storageID. */ public static DatanodeDescriptor getDatanode(final FSNamesystem ns, final String storageID) { ns.readLock(); try { return ns.getBlockManager().getDatanodeManager().getDatanode(storageID); } finally { ns.readUnlock(); } }