/** * Make sure that client failover works when an active NN dies and the standby * takes over. */ @Test public void testDfsClientFailover() throws IOException, URISyntaxException { FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); DFSTestUtil.createFile(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY, (short)1, 1L); assertEquals(fs.getFileStatus(TEST_FILE).getLen(), FILE_LENGTH_TO_VERIFY); cluster.shutdownNameNode(0); cluster.transitionToActive(1); assertEquals(fs.getFileStatus(TEST_FILE).getLen(), FILE_LENGTH_TO_VERIFY); // Check that it functions even if the URL becomes canonicalized // to include a port number. Path withPort = new Path("hdfs://" + HATestUtil.getLogicalHostname(cluster) + ":" + NameNode.DEFAULT_PORT + "/" + TEST_FILE.toUri().getPath()); FileSystem fs2 = withPort.getFileSystem(fs.getConf()); assertTrue(fs2.exists(withPort)); fs.close(); }
@Override public void start(Object service) { NameNode nn = (NameNode)service; Configuration conf = null; try { conf = nn.getConf(); } catch (NoSuchMethodError ex) { LOG.warn("No method getConf() in this NameNode : " + ex); } try { rpcServer = new NuCypherExtRpcServer(conf, nn); rpcServer.start(); LOG.info(toString() + " started"); } catch (IOException e) { LOG.error("Cannot create NuCypherExtRpcServer: " + e); } }
public RpcProgramMountd(NfsConfiguration config, DatagramSocket registrationSocket, boolean allowInsecurePorts) throws IOException { // Note that RPC cache is not enabled super("mountd", "localhost", config.getInt( NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY, NfsConfigKeys.DFS_NFS_MOUNTD_PORT_DEFAULT), PROGRAM, VERSION_1, VERSION_3, registrationSocket, allowInsecurePorts); exports = new ArrayList<String>(); exports.add(config.get(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT)); this.hostsMatcher = NfsExports.getInstance(config); this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>()); UserGroupInformation.setConfiguration(config); SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY, NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY); this.dfsClient = new DFSClient(NameNode.getAddress(config), config); }
/** * Returns list of InetSocketAddresses corresponding to namenodes from the * configuration. * * Returns namenode address specifically configured for datanodes (using * service ports), if found. If not, regular RPC address configured for other * clients is returned. * * @param conf configuration * @return list of InetSocketAddress * @throws IOException on error */ public static Map<String, Map<String, InetSocketAddress>> getNNServiceRpcAddresses( Configuration conf) throws IOException { // Use default address as fall back String defaultAddress; try { defaultAddress = NetUtils.getHostPortString(NameNode.getAddress(conf)); } catch (IllegalArgumentException e) { defaultAddress = null; } Map<String, Map<String, InetSocketAddress>> addressList = getAddresses(conf, defaultAddress, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY); if (addressList.isEmpty()) { throw new IOException("Incorrect configuration: namenode address " + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or " + DFS_NAMENODE_RPC_ADDRESS_KEY + " is not configured."); } return addressList; }
/** * Start the BackupNode */ public BackupNode startBackupNode(Configuration conf) throws IOException { // Set up testing environment directories hdfsDir = new File(TEST_DATA_DIR, "backupNode"); if ( hdfsDir.exists() && !FileUtil.fullyDelete(hdfsDir) ) { throw new IOException("Could not delete hdfs directory '" + hdfsDir + "'"); } File currDir = new File(hdfsDir, "name2"); File currDir2 = new File(currDir, "current"); File currDir3 = new File(currDir, "image"); assertTrue(currDir.mkdirs()); assertTrue(currDir2.mkdirs()); assertTrue(currDir3.mkdirs()); conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, fileAsURI(new File(hdfsDir, "name2")).toString()); conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, "${" + DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY + "}"); // Start BackupNode String[] args = new String [] { StartupOption.BACKUP.getName() }; BackupNode bu = (BackupNode)NameNode.createNameNode(args, conf); return bu; }
/** * Private helper methods to save delegation keys and tokens in fsimage */ private synchronized void saveCurrentTokens(DataOutputStream out, String sdPath) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.DELEGATION_TOKENS, sdPath); prog.beginStep(Phase.SAVING_CHECKPOINT, step); prog.setTotal(Phase.SAVING_CHECKPOINT, step, currentTokens.size()); Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); out.writeInt(currentTokens.size()); Iterator<DelegationTokenIdentifier> iter = currentTokens.keySet() .iterator(); while (iter.hasNext()) { DelegationTokenIdentifier id = iter.next(); id.write(out); DelegationTokenInformation info = currentTokens.get(id); out.writeLong(info.getRenewDate()); counter.increment(); } prog.endStep(Phase.SAVING_CHECKPOINT, step); }
/** * Private helper method to load delegation keys from fsimage. * @throws IOException on error */ private synchronized void loadAllKeys(DataInput in) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.DELEGATION_KEYS); prog.beginStep(Phase.LOADING_FSIMAGE, step); int numberOfKeys = in.readInt(); prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfKeys); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); for (int i = 0; i < numberOfKeys; i++) { DelegationKey value = new DelegationKey(); value.readFields(in); addKey(value); counter.increment(); } prog.endStep(Phase.LOADING_FSIMAGE, step); }
/** * Select the delegation token for hdfs. The port will be rewritten to * the port of hdfs.service.host_$nnAddr, or the default rpc namenode port. * This method should only be called by non-hdfs filesystems that do not * use the rpc port to acquire tokens. Ex. webhdfs, hftp * @param nnUri of the remote namenode * @param tokens as a collection * @param conf hadoop configuration * @return Token */ public Token<DelegationTokenIdentifier> selectToken( final URI nnUri, Collection<Token<?>> tokens, final Configuration conf) { // this guesses the remote cluster's rpc service port. // the current token design assumes it's the same as the local cluster's // rpc port unless a config key is set. there should be a way to automatic // and correctly determine the value Text serviceName = SecurityUtil.buildTokenService(nnUri); final String nnServiceName = conf.get(SERVICE_NAME_KEY + serviceName); int nnRpcPort = NameNode.DEFAULT_PORT; if (nnServiceName != null) { nnRpcPort = NetUtils.createSocketAddr(nnServiceName, nnRpcPort).getPort(); } // use original hostname from the uri to avoid unintentional host resolving serviceName = SecurityUtil.buildTokenService( NetUtils.createSocketAddrForHost(nnUri.getHost(), nnRpcPort)); return selectToken(serviceName, tokens); }
/** * Verify secondary namenode port usage. */ @Test(timeout = 300000) public void testSecondaryNodePorts() throws Exception { NameNode nn = null; try { nn = startNameNode(); // bind http server to the same port as name-node Configuration conf2 = new HdfsConfiguration(config); conf2.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY)); LOG.info("= Starting 1 on: " + conf2.get(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY)); boolean started = canStartSecondaryNode(conf2); assertFalse(started); // should fail // bind http server to a different port conf2.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, THIS_HOST); LOG.info("= Starting 2 on: " + conf2.get(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY)); started = canStartSecondaryNode(conf2); assertTrue(started); // should start now } finally { stopNameNode(nn); } }
@Override public synchronized ProxyInfo<T> getProxy() { // Create a non-ha proxy if not already created. if (nnProxyInfo == null) { try { // Create a proxy that is not wrapped in RetryProxy InetSocketAddress nnAddr = NameNode.getAddress(nameNodeUri); nnProxyInfo = new ProxyInfo<T>(NameNodeProxies.createNonHAProxy( conf, nnAddr, xface, UserGroupInformation.getCurrentUser(), false).getProxy(), nnAddr.toString()); } catch (IOException ioe) { throw new RuntimeException(ioe); } } return nnProxyInfo; }
@Override public int run(String[] args) throws Exception { parseArgs(args); parseConfAndFindOtherNN(); NameNode.checkAllowFormat(conf); InetSocketAddress myAddr = NameNode.getAddress(conf); SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY, DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, myAddr.getHostName()); return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() { @Override public Integer run() { try { return doRun(); } catch (IOException e) { throw new RuntimeException(e); } } }); }
@Test(timeout = 30000) public void testReadSnapshotFileWithCheckpoint() throws Exception { Path foo = new Path("/foo"); hdfs.mkdirs(foo); hdfs.allowSnapshot(foo); Path bar = new Path("/foo/bar"); DFSTestUtil.createFile(hdfs, bar, 100, (short) 2, 100024L); hdfs.createSnapshot(foo, "s1"); assertTrue(hdfs.delete(bar, true)); // checkpoint NameNode nameNode = cluster.getNameNode(); NameNodeAdapter.enterSafeMode(nameNode, false); NameNodeAdapter.saveNamespace(nameNode); NameNodeAdapter.leaveSafeMode(nameNode); // restart namenode to load snapshot files from fsimage cluster.restartNameNode(true); String snapshotPath = Snapshot.getSnapshotPath(foo.toString(), "s1/bar"); DFSTestUtil.readFile(hdfs, new Path(snapshotPath)); }
/** * Remove a datanode * @throws UnregisteredNodeException */ public void removeDatanode(final DatanodeID node ) throws UnregisteredNodeException { namesystem.writeLock(); try { final DatanodeDescriptor descriptor = getDatanode(node); if (descriptor != null) { removeDatanode(descriptor); } else { NameNode.stateChangeLog.warn("BLOCK* removeDatanode: " + node + " does not exist"); } } finally { namesystem.writeUnlock(); } }
/** Remove a dead datanode. */ void removeDeadDatanode(final DatanodeID nodeID) { synchronized(datanodeMap) { DatanodeDescriptor d; try { d = getDatanode(nodeID); } catch(IOException e) { d = null; } if (d != null && isDatanodeDead(d)) { NameNode.stateChangeLog.info( "BLOCK* removeDeadDatanode: lost heartbeat from " + d); removeDatanode(d); } } }
/** * Process the recorded replicas. When about to commit or finish the * pipeline recovery sort out bad replicas. * @param genStamp The final generation stamp for the block. */ public void setGenerationStampAndVerifyReplicas(long genStamp) { // Set the generation stamp for the block. setGenerationStamp(genStamp); if (replicas == null) return; // Remove the replicas with wrong gen stamp. // The replica list is unchanged. for (ReplicaUnderConstruction r : replicas) { if (genStamp != r.getGenerationStamp()) { r.getExpectedStorageLocation().removeBlock(this); NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica " + "from location: {}", r.getExpectedStorageLocation()); } } }
/** * Remove a block from the under replication queues. * * The priLevel parameter is a hint of which queue to query * first: if negative or >= {@link #LEVEL} this shortcutting * is not attmpted. * * If the block is not found in the nominated queue, an attempt is made to * remove it from all queues. * * <i>Warning:</i> This is not a synchronized method. * @param block block to remove * @param priLevel expected privilege level * @return true if the block was found and removed from one of the priority queues */ boolean remove(Block block, int priLevel) { if(priLevel >= 0 && priLevel < LEVEL && priorityQueues.get(priLevel).remove(block)) { NameNode.blockStateChangeLog.debug( "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block {}" + " from priority queue {}", block, priLevel); return true; } else { // Try to remove the block from all queues if the block was // not found in the queue for the given priority level. for (int i = 0; i < LEVEL; i++) { if (priorityQueues.get(i).remove(block)) { NameNode.blockStateChangeLog.debug( "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block" + " {} from priority queue {}", block, priLevel); return true; } } } return false; }
/** * Add a block to the block collection * which will be invalidated on the specified datanode. */ synchronized void add(final Block block, final DatanodeInfo datanode, final boolean log) { LightWeightHashSet<Block> set = node2blocks.get(datanode); if (set == null) { set = new LightWeightHashSet<Block>(); node2blocks.put(datanode, set); } if (set.add(block)) { numBlocks++; if (log) { NameNode.blockStateChangeLog.info("BLOCK* {}: add {} to {}", getClass().getSimpleName(), block, datanode); } } }
public static DFSZKFailoverController create(Configuration conf) { Configuration localNNConf = DFSHAAdmin.addSecurityConfiguration(conf); String nsId = DFSUtil.getNamenodeNameServiceId(conf); if (!HAUtil.isHAEnabled(localNNConf, nsId)) { throw new HadoopIllegalArgumentException( "HA is not enabled for this namenode."); } String nnId = HAUtil.getNameNodeId(localNNConf, nsId); if (nnId == null) { String msg = "Could not get the namenode ID of this node. " + "You may run zkfc on the node other than namenode."; throw new HadoopIllegalArgumentException(msg); } NameNode.initializeGenericKeys(localNNConf, nsId, nnId); DFSUtil.setGenericConf(localNNConf, nsId, nnId, ZKFC_CONF_KEYS); NNHAServiceTarget localTarget = new NNHAServiceTarget( localNNConf, nsId, nnId); return new DFSZKFailoverController(localNNConf, localTarget); }
@Test public void testStateTransition() throws Exception { NameNode nnode1 = cluster.getNameNode(0); assertTrue(nnode1.isStandbyState()); assertEquals(0, runTool("-transitionToActive", "nn1")); assertFalse(nnode1.isStandbyState()); assertEquals(0, runTool("-transitionToStandby", "nn1")); assertTrue(nnode1.isStandbyState()); NameNode nnode2 = cluster.getNameNode(1); assertTrue(nnode2.isStandbyState()); assertEquals(0, runTool("-transitionToActive", "nn2")); assertFalse(nnode2.isStandbyState()); assertEquals(0, runTool("-transitionToStandby", "nn2")); assertTrue(nnode2.isStandbyState()); }
/** * @return the node which is expected to run the recovery of the * given block, which is known to be under construction inside the * given NameNOde. */ public static DatanodeDescriptor getExpectedPrimaryNode(NameNode nn, ExtendedBlock blk) { BlockManager bm0 = nn.getNamesystem().getBlockManager(); BlockInfoContiguous storedBlock = bm0.getStoredBlock(blk.getLocalBlock()); assertTrue("Block " + blk + " should be under construction, " + "got: " + storedBlock, storedBlock instanceof BlockInfoContiguousUnderConstruction); BlockInfoContiguousUnderConstruction ucBlock = (BlockInfoContiguousUnderConstruction)storedBlock; // We expect that the replica with the most recent heart beat will be // the one to be in charge of the synchronization / recovery protocol. final DatanodeStorageInfo[] storages = ucBlock.getExpectedStorageLocations(); DatanodeStorageInfo expectedPrimary = storages[0]; long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor() .getLastUpdateMonotonic(); for (int i = 1; i < storages.length; i++) { final long lastUpdate = storages[i].getDatanodeDescriptor() .getLastUpdateMonotonic(); if (lastUpdate > mostRecentLastUpdate) { expectedPrimary = storages[i]; mostRecentLastUpdate = lastUpdate; } } return expectedPrimary.getDatanodeDescriptor(); }
private void execute(String [] args, String namenode) { FsShell shell=new FsShell(); FileSystem fs=null; try { ToolRunner.run(shell, args); fs = FileSystem.get(NameNode.getUri(NameNode.getAddress(namenode)), shell.getConf()); assertTrue("Directory does not get created", fs.isDirectory(new Path("/data"))); fs.delete(new Path("/data"), true); } catch (Exception e) { System.err.println(e.getMessage()); e.printStackTrace(); } finally { if (fs!=null) { try { fs.close(); } catch (IOException ignored) { } } } }
/** * Attempts to start a NameNode with the given operation. Starting * the NameNode should throw an exception. */ void startNameNodeShouldFail(String searchString) { try { NameNode.doRollback(conf, false); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) .format(false) .manageDataDfsDirs(false) .manageNameDfsDirs(false) .build(); // should fail throw new AssertionError("NameNode should have failed to start"); } catch (Exception expected) { if (!expected.getMessage().contains(searchString)) { fail("Expected substring '" + searchString + "' in exception " + "but got: " + StringUtils.stringifyException(expected)); } // expected } }
@Test public void testFilesDeletionWithCheckpoint() throws Exception { Path path = new Path("/test"); doWriteAndAbort(fs, path); fs.delete(new Path("/test/test/test2"), true); fs.delete(new Path("/test/test/test3"), true); NameNode nameNode = cluster.getNameNode(); NameNodeAdapter.enterSafeMode(nameNode, false); NameNodeAdapter.saveNamespace(nameNode); NameNodeAdapter.leaveSafeMode(nameNode); cluster.restartNameNode(true); // read snapshot file after restart String test2snapshotPath = Snapshot.getSnapshotPath(path.toString(), "s1/test/test2"); DFSTestUtil.readFile(fs, new Path(test2snapshotPath)); String test3snapshotPath = Snapshot.getSnapshotPath(path.toString(), "s1/test/test3"); DFSTestUtil.readFile(fs, new Path(test3snapshotPath)); }
/** * Trigger an edits log roll on the active and then wait for the standby to * catch up to all the edits done by the active. This method will check * repeatedly for up to NN_LAG_TIMEOUT milliseconds, and then fail throwing * {@link CouldNotCatchUpException} * * @param active active NN * @param standby standby NN which should catch up to active * @throws IOException if an error occurs rolling the edit log * @throws CouldNotCatchUpException if the standby doesn't catch up to the * active in NN_LAG_TIMEOUT milliseconds */ public static void waitForStandbyToCatchUp(NameNode active, NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException { long activeTxId = active.getNamesystem().getFSImage().getEditLog() .getLastWrittenTxId(); active.getRpcServer().rollEditLog(); long start = Time.now(); while (Time.now() - start < TestEditLogTailer.NN_LAG_TIMEOUT) { long nn2HighestTxId = standby.getNamesystem().getFSImage() .getLastAppliedTxId(); if (nn2HighestTxId >= activeTxId) { return; } Thread.sleep(TestEditLogTailer.SLEEP_TIME); } throw new CouldNotCatchUpException("Standby did not catch up to txid " + activeTxId + " (currently at " + standby.getNamesystem().getFSImage().getLastAppliedTxId() + ")"); }
private static void testFailoverAfterCrashDuringLogRoll(boolean writeHeader) throws Exception { Configuration conf = new Configuration(); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, Integer.MAX_VALUE); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(0) .build(); FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); try { cluster.transitionToActive(0); NameNode nn0 = cluster.getNameNode(0); nn0.getRpcServer().rollEditLog(); cluster.shutdownNameNode(0); createEmptyInProgressEditLog(cluster, nn0, writeHeader); cluster.transitionToActive(1); } finally { IOUtils.cleanup(LOG, fs); cluster.shutdown(); } }
private static void createEmptyInProgressEditLog(MiniDFSCluster cluster, NameNode nn, boolean writeHeader) throws IOException { long txid = nn.getNamesystem().getEditLog().getLastWrittenTxId(); URI sharedEditsUri = cluster.getSharedEditsDir(0, 1); File sharedEditsDir = new File(sharedEditsUri.getPath()); StorageDirectory storageDir = new StorageDirectory(sharedEditsDir); File inProgressFile = NameNodeAdapter.getInProgressEditsFile(storageDir, txid + 1); assertTrue("Failed to create in-progress edits file", inProgressFile.createNewFile()); if (writeHeader) { DataOutputStream out = new DataOutputStream(new FileOutputStream( inProgressFile)); EditLogFileOutputStream.writeHeader( NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION, out); out.close(); } }
@Test public void testGetOtherNNHttpAddress() throws IOException { // Use non-local addresses to avoid host address matching Configuration conf = getHAConf("ns1", "1.2.3.1", "1.2.3.2"); conf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, "ns1"); // This is done by the NN before the StandbyCheckpointer is created NameNode.initializeGenericKeys(conf, "ns1", "nn1"); // Since we didn't configure the HTTP address, and the default is // 0.0.0.0, it should substitute the address from the RPC configuration // above. StandbyCheckpointer checkpointer = new StandbyCheckpointer(conf, fsn); assertEquals(new URL("http", "1.2.3.2", DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, ""), checkpointer.getActiveNNAddress()); }
public void testFsCache() throws Exception { { long now = System.currentTimeMillis(); String[] users = new String[]{"foo","bar"}; final Configuration conf = new Configuration(); FileSystem[] fs = new FileSystem[users.length]; for(int i = 0; i < users.length; i++) { UserGroupInformation ugi = UserGroupInformation.createRemoteUser(users[i]); fs[i] = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { public FileSystem run() throws IOException { return FileSystem.get(conf); }}); for(int j = 0; j < i; j++) { assertFalse(fs[j] == fs[i]); } } FileSystem.closeAll(); } { try { runTestCache(NameNode.DEFAULT_PORT); } catch(java.net.BindException be) { LOG.warn("Cannot test NameNode.DEFAULT_PORT (=" + NameNode.DEFAULT_PORT + ")", be); } runTestCache(0); } }
static void writeFile(NameNode namenode, Configuration conf, Path name, short replication) throws IOException, TimeoutException, InterruptedException { FileSystem fileSys = FileSystem.get(conf); SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, name, BytesWritable.class, BytesWritable.class, CompressionType.NONE); writer.append(new BytesWritable(), new BytesWritable()); writer.close(); fileSys.setReplication(name, replication); DFSTestUtil.waitReplication(fileSys, name, replication); }
/** Disable the logs that are not very useful for snapshot related tests. */ public static void disableLogs() { final String[] lognames = { "org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner", "org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl", "org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService", }; for(String n : lognames) { GenericTestUtils.disableLog(LogFactory.getLog(n)); } GenericTestUtils.disableLog(LogFactory.getLog(UserGroupInformation.class)); GenericTestUtils.disableLog(LogFactory.getLog(BlockManager.class)); GenericTestUtils.disableLog(LogFactory.getLog(FSNamesystem.class)); GenericTestUtils.disableLog(LogFactory.getLog(DirectoryScanner.class)); GenericTestUtils.disableLog(LogFactory.getLog(MetricsSystemImpl.class)); GenericTestUtils.disableLog(BlockScanner.LOG); GenericTestUtils.disableLog(HttpServer2.LOG); GenericTestUtils.disableLog(DataNode.LOG); GenericTestUtils.disableLog(BlockPoolSliceStorage.LOG); GenericTestUtils.disableLog(LeaseManager.LOG); GenericTestUtils.disableLog(NameNode.stateChangeLog); GenericTestUtils.disableLog(NameNode.blockStateChangeLog); GenericTestUtils.disableLog(DFSClient.LOG); GenericTestUtils.disableLog(Server.LOG); }
@Before public void setUp() throws Exception { FileSystem.setDefaultUri(CONF, "hdfs://localhost:0"); CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); // Set properties to make HDFS aware of NodeGroup. CONF.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyWithNodeGroup.class.getName()); CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, NetworkTopologyWithNodeGroup.class.getName()); CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true); File baseDir = PathUtils.getTestDir(TestReplicationPolicyWithNodeGroup.class); CONF.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(baseDir, "name").getPath()); DFSTestUtil.formatNameNode(CONF); namenode = new NameNode(CONF); final BlockManager bm = namenode.getNamesystem().getBlockManager(); replicator = bm.getBlockPlacementPolicy(); cluster = bm.getDatanodeManager().getNetworkTopology(); // construct network topology for(int i=0; i<NUM_OF_DATANODES; i++) { cluster.add(dataNodes[i]); } setupDataNodeCapacity(); }
static void waitForBlockLocations(final MiniDFSCluster cluster, final NameNode nn, final String path, final int expectedReplicas) throws Exception { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { try { LocatedBlocks locs = NameNodeAdapter.getBlockLocations(nn, path, 0, 1000); DatanodeInfo[] dnis = locs.getLastLocatedBlock().getLocations(); for (DatanodeInfo dni : dnis) { Assert.assertNotNull(dni); } int numReplicas = dnis.length; LOG.info("Got " + numReplicas + " locs: " + locs); if (numReplicas > expectedReplicas) { cluster.triggerDeletionReports(); } cluster.triggerHeartbeats(); return numReplicas == expectedReplicas; } catch (IOException e) { LOG.warn("No block locations yet: " + e.getMessage()); return false; } } }, 500, 20000); }
/** * This constructor has the signature needed by * {@link AbstractFileSystem#createFileSystem(URI, Configuration)} * * @param theUri which must be that of Hdfs * @param conf configuration * @throws IOException */ Hdfs(final URI theUri, final Configuration conf) throws IOException, URISyntaxException { super(theUri, HdfsConstants.HDFS_URI_SCHEME, true, NameNode.DEFAULT_PORT); if (!theUri.getScheme().equalsIgnoreCase(HdfsConstants.HDFS_URI_SCHEME)) { throw new IllegalArgumentException("Passed URI's scheme is not for Hdfs"); } String host = theUri.getHost(); if (host == null) { throw new IOException("Incomplete HDFS URI, no host: " + theUri); } this.dfs = new DFSClient(theUri, conf, getStatistics()); }
/** * Returns list of InetSocketAddresses corresponding to the namenode * that manages this cluster. Note this is to be used by datanodes to get * the list of namenode addresses to talk to. * * Returns namenode address specifically configured for datanodes (using * service ports), if found. If not, regular RPC address configured for other * clients is returned. * * @param conf configuration * @return list of InetSocketAddress * @throws IOException on error */ public static Map<String, Map<String, InetSocketAddress>> getNNServiceRpcAddressesForCluster(Configuration conf) throws IOException { // Use default address as fall back String defaultAddress; try { defaultAddress = NetUtils.getHostPortString(NameNode.getAddress(conf)); } catch (IllegalArgumentException e) { defaultAddress = null; } Collection<String> parentNameServices = conf.getTrimmedStringCollection (DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY); if (parentNameServices.isEmpty()) { parentNameServices = conf.getTrimmedStringCollection (DFSConfigKeys.DFS_NAMESERVICES); } else { // Ensure that the internal service is ineed in the list of all available // nameservices. Set<String> availableNameServices = Sets.newHashSet(conf .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES)); for (String nsId : parentNameServices) { if (!availableNameServices.contains(nsId)) { throw new IOException("Unknown nameservice: " + nsId); } } } Map<String, Map<String, InetSocketAddress>> addressList = getAddressesForNsIds(conf, parentNameServices, defaultAddress, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY); if (addressList.isEmpty()) { throw new IOException("Incorrect configuration: namenode address " + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or " + DFS_NAMENODE_RPC_ADDRESS_KEY + " is not configured."); } return addressList; }
/** A utility method for creating credentials. */ public static Credentials createCredentials(final NameNode namenode, final UserGroupInformation ugi, final String renewer) throws IOException { final Token<DelegationTokenIdentifier> token = namenode.getRpcServer( ).getDelegationToken(new Text(renewer)); if (token == null) { return null; } final InetSocketAddress addr = namenode.getNameNodeAddress(); SecurityUtil.setTokenService(token, addr); final Credentials c = new Credentials(); c.addToken(new Text(ugi.getShortUserName()), token); return c; }
/** * Check whether the namenode can be started. */ private boolean canStartNameNode(Configuration conf) throws IOException { NameNode nn2 = null; try { nn2 = NameNode.createNameNode(new String[]{}, conf); } catch(IOException e) { if (e instanceof java.net.BindException) return false; throw e; } finally { stopNameNode(nn2); } return true; }
private static NamenodeProtocols getRPCServer(NameNode namenode) throws IOException { final NamenodeProtocols np = namenode.getRpcServer(); if (np == null) { throw new RetriableException("Namenode is in startup mode"); } return np; }
private Token<? extends TokenIdentifier> generateDelegationToken( final NameNode namenode, final UserGroupInformation ugi, final String renewer) throws IOException { final Credentials c = DelegationTokenSecretManager.createCredentials( namenode, ugi, renewer != null? renewer: ugi.getShortUserName()); if (c == null) { return null; } final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next(); Text kind = request.getScheme().equals("http") ? WebHdfsFileSystem.TOKEN_KIND : SWebHdfsFileSystem.TOKEN_KIND; t.setKind(kind); return t; }
private Response post( final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, final DoAsParam doAsUser, final String fullpath, final PostOpParam op, final ConcatSourcesParam concatSrcs, final BufferSizeParam bufferSize, final ExcludeDatanodesParam excludeDatanodes, final NewLengthParam newLength ) throws IOException, URISyntaxException { final NameNode namenode = (NameNode)context.getAttribute("name.node"); final NamenodeProtocols np = getRPCServer(namenode); switch(op.getValue()) { case APPEND: { final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, fullpath, op.getValue(), -1L, -1L, excludeDatanodes.getValue(), bufferSize); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } case CONCAT: { np.concat(fullpath, concatSrcs.getAbsolutePaths()); return Response.ok().build(); } case TRUNCATE: { // We treat each rest request as a separate client. final boolean b = np.truncate(fullpath, newLength.getValue(), "DFSClient_" + DFSUtil.getSecureRandom().nextLong()); final String js = JsonUtil.toJsonString("boolean", b); return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); } default: throw new UnsupportedOperationException(op + " is not supported"); } }
private Response delete( final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, final DoAsParam doAsUser, final String fullpath, final DeleteOpParam op, final RecursiveParam recursive, final SnapshotNameParam snapshotName ) throws IOException { final NameNode namenode = (NameNode)context.getAttribute("name.node"); final NamenodeProtocols np = getRPCServer(namenode); switch(op.getValue()) { case DELETE: { final boolean b = np.delete(fullpath, recursive.getValue()); final String js = JsonUtil.toJsonString("boolean", b); return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); } case DELETESNAPSHOT: { np.deleteSnapshot(fullpath, snapshotName.getValue()); return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build(); } default: throw new UnsupportedOperationException(op + " is not supported"); } }