/** * Initialize the address and port for this NameNode. In the * non-federated case, the nameservice and namenode ID may be * null. */ private static void initNameNodeAddress(Configuration conf, String nameserviceId, NNConf nnConf) { // Set NN-specific specific key String key = DFSUtil.addKeySuffixes( DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId, nnConf.getNnId()); conf.set(key, "127.0.0.1:" + nnConf.getHttpPort()); key = DFSUtil.addKeySuffixes( DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId, nnConf.getNnId()); conf.set(key, "127.0.0.1:" + nnConf.getIpcPort()); }
/** * Add a namenode to a federated cluster and start it. Configuration of * datanodes in the cluster is refreshed to register with the new namenode. * * @return newly started namenode */ public NameNode addNameNode(Configuration conf, int namenodePort) throws IOException { if(!federation) throw new IOException("cannot add namenode to non-federated cluster"); int nnIndex = nameNodes.length; int numNameNodes = nameNodes.length + 1; NameNodeInfo[] newlist = new NameNodeInfo[numNameNodes]; System.arraycopy(nameNodes, 0, newlist, 0, nameNodes.length); nameNodes = newlist; String nameserviceId = NAMESERVICE_ID_PREFIX + (nnIndex + 1); String nameserviceIds = conf.get(DFS_NAMESERVICES); nameserviceIds += "," + nameserviceId; conf.set(DFS_NAMESERVICES, nameserviceIds); String nnId = null; initNameNodeAddress(conf, nameserviceId, new NNConf(nnId).setIpcPort(namenodePort)); initNameNodeConf(conf, nameserviceId, nnId, true, true, nnIndex); createNameNode(nnIndex, conf, numDataNodes, true, null, null, nameserviceId, nnId); // Refresh datanodes with the newly started namenode for (DataNodeProperties dn : dataNodes) { DataNode datanode = dn.datanode; datanode.refreshNamenodes(conf); } // Wait for new namenode to get registrations from all the datanodes waitActive(nnIndex); return nameNodes[nnIndex].nameNode; }
/** * Add a namenode to a federated cluster and start it. Configuration of * datanodes in the cluster is refreshed to register with the new namenode. * * @return newly started namenode */ public void addNameNode(Configuration conf, int namenodePort) throws IOException { if(!federation) throw new IOException("cannot add namenode to non-federated cluster"); int nameServiceIndex = namenodes.keys().size(); String nameserviceId = NAMESERVICE_ID_PREFIX + (namenodes.keys().size() + 1); String nameserviceIds = conf.get(DFS_NAMESERVICES); nameserviceIds += "," + nameserviceId; conf.set(DFS_NAMESERVICES, nameserviceIds); String nnId = null; initNameNodeAddress(conf, nameserviceId, new NNConf(nnId).setIpcPort(namenodePort)); // figure out the current number of NNs NameNodeInfo[] infos = this.getNameNodeInfos(nameserviceId); int nnIndex = infos == null ? 0 : infos.length; initNameNodeConf(conf, nameserviceId, nameServiceIndex, nnId, true, true, nnIndex); createNameNode(conf, true, null, null, nameserviceId, nnId); // Refresh datanodes with the newly started namenode for (DataNodeProperties dn : dataNodes) { DataNode datanode = dn.datanode; datanode.refreshNamenodes(conf); } // Wait for new namenode to get registrations from all the datanodes waitActive(nnIndex); }
private void createNameNode(int nnIndex, Configuration conf, int numDataNodes, boolean format, StartupOption operation, String clusterId, NNConf nnConf) throws IOException { Configuration nameNodeConf = new Configuration(conf); nameNodeConf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY, "127.0.0.1:" + nnConf.getHttpPort()); nameNodeConf .set(DFS_NAMENODE_RPC_ADDRESS_KEY, "127.0.0.1:" + nnConf.getIpcPort()); nameNodeConf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "localhost:0"); // Format and clean out DataNode directories if (format) { DFSTestUtil.formatNameNode(nameNodeConf); } if (operation == StartupOption.UPGRADE) { operation.setClusterId(clusterId); } // Start the NameNode String[] args = (operation == null || operation == StartupOption.FORMAT || operation == StartupOption.REGULAR) ? new String[]{} : new String[]{operation.getName()}; NameNode nn = NameNode.createNameNode(args, nameNodeConf); if (operation == StartupOption.RECOVER) { return; } // After the NN has started, set back the bound ports into // the conf nameNodeConf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, nn.getNameNodeAddressHostPortString()); nameNodeConf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY, NetUtils.getHostPortString(nn.getHttpAddress())); nameNodes[nnIndex] = new NameNodeInfo(nn, nnConf.getNnId(), nameNodeConf); }
/** * Test a cluster with even distribution, then a new empty node is added to * the cluster. Test start a cluster with specified number of nodes, and fills * it to be 30% full (with a single file replicated identically to all * datanodes); It then adds one new empty node and starts balancing. */ @Test(timeout = 60000) public void testBalancerWithHANameNodes() throws Exception { Configuration conf = new HdfsConfiguration(); TestBalancer.initConf(conf); long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity String newNodeRack = TestBalancer.RACK2; // new node's rack // array of racks for original nodes in cluster String[] racks = new String[] { TestBalancer.RACK0, TestBalancer.RACK1 }; // array of capacities of original nodes in cluster long[] capacities = new long[] { TestBalancer.CAPACITY, TestBalancer.CAPACITY }; assertEquals(capacities.length, racks.length); int numOfDatanodes = capacities.length; NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1"); nn1Conf.setIpcPort(NameNode.DEFAULT_PORT); Configuration copiedConf = new Configuration(conf); cluster = new MiniDFSCluster.Builder(copiedConf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(capacities.length) .racks(racks) .simulatedCapacities(capacities) .build(); HATestUtil.setFailoverConfigurations(cluster, conf); try { cluster.waitActive(); cluster.transitionToActive(1); Thread.sleep(500); client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), ClientProtocol.class).getProxy(); long totalCapacity = TestBalancer.sum(capacities); // fill up the cluster to be 30% full long totalUsedSpace = totalCapacity * 3 / 10; TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace / numOfDatanodes, (short) numOfDatanodes, 1); // start up an empty node with the same capacity and on the same rack cluster.startDataNodes(conf, 1, true, null, new String[] { newNodeRack }, new long[] { newNodeCapacity }); totalCapacity += newNodeCapacity; TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); assertEquals(1, namenodes.size()); assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster))); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); assertEquals(ExitStatus.SUCCESS.getExitCode(), r); TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, Balancer.Parameters.DEFAULT); } finally { cluster.shutdown(); } }
@Test public void testRefreshNamenodes() throws IOException { // Start cluster with a single NN and DN Configuration conf = new Configuration(); MiniDFSCluster cluster = null; try { MiniDFSNNTopology topology = new MiniDFSNNTopology() .addNameservice(new NSConf("ns1").addNN( new NNConf(null).setIpcPort(nnPort1))) .setFederation(true); cluster = new MiniDFSCluster.Builder(conf) .nnTopology(topology) .build(); DataNode dn = cluster.getDataNodes().get(0); assertEquals(1, dn.getAllBpOs().length); cluster.addNameNode(conf, nnPort2); assertEquals(2, dn.getAllBpOs().length); cluster.addNameNode(conf, nnPort3); assertEquals(3, dn.getAllBpOs().length); cluster.addNameNode(conf, nnPort4); // Ensure a BPOfferService in the datanodes corresponds to // a namenode in the cluster Set<InetSocketAddress> nnAddrsFromCluster = Sets.newHashSet(); for (int i = 0; i < 4; i++) { assertTrue(nnAddrsFromCluster.add( cluster.getNameNode(i).getNameNodeAddress())); } Set<InetSocketAddress> nnAddrsFromDN = Sets.newHashSet(); for (BPOfferService bpos : dn.getAllBpOs()) { for (BPServiceActor bpsa : bpos.getBPServiceActors()) { assertTrue(nnAddrsFromDN.add(bpsa.getNNSocketAddress())); } } assertEquals("", Joiner.on(",").join( Sets.symmetricDifference(nnAddrsFromCluster, nnAddrsFromDN))); } finally { if (cluster != null) { cluster.shutdown(); } } }
/** * Test a cluster with even distribution, then a new empty node is added to * the cluster. Test start a cluster with specified number of nodes, and fills * it to be 30% full (with a single file replicated identically to all * datanodes); It then adds one new empty node and starts balancing. */ @Test(timeout = 60000) public void testBalancerWithHANameNodes() throws Exception { Configuration conf = new HdfsConfiguration(); TestBalancer.initConf(conf); long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity String newNodeRack = TestBalancer.RACK2; // new node's rack // array of racks for original nodes in cluster String[] racks = new String[] { TestBalancer.RACK0, TestBalancer.RACK1 }; // array of capacities of original nodes in cluster long[] capacities = new long[] { TestBalancer.CAPACITY, TestBalancer.CAPACITY }; assertEquals(capacities.length, racks.length); int numOfDatanodes = capacities.length; NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1"); nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT); Configuration copiedConf = new Configuration(conf); cluster = new MiniDFSCluster.Builder(copiedConf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(capacities.length) .racks(racks) .simulatedCapacities(capacities) .build(); HATestUtil.setFailoverConfigurations(cluster, conf); try { cluster.waitActive(); cluster.transitionToActive(1); Thread.sleep(500); client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), ClientProtocol.class).getProxy(); long totalCapacity = TestBalancer.sum(capacities); // fill up the cluster to be 30% full long totalUsedSpace = totalCapacity * 3 / 10; TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace / numOfDatanodes, (short) numOfDatanodes, 1); // start up an empty node with the same capacity and on the same rack cluster.startDataNodes(conf, 1, true, null, new String[] { newNodeRack }, new long[] { newNodeCapacity }); totalCapacity += newNodeCapacity; TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); assertEquals(1, namenodes.size()); assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster))); final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); assertEquals(ExitStatus.SUCCESS.getExitCode(), r); TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, BalancerParameters.DEFAULT); } finally { cluster.shutdown(); } }
@Test public void testRefreshNamenodes() throws IOException { // Start cluster with a single NN and DN Configuration conf = new Configuration(); MiniDFSCluster cluster = null; try { MiniDFSNNTopology topology = new MiniDFSNNTopology() .addNameservice(new NSConf("ns1").addNN( new NNConf(null).setIpcPort(nnPort1))) .setFederation(true); cluster = new MiniDFSCluster.Builder(conf) .nnTopology(topology) .build(); DataNode dn = cluster.getDataNodes().get(0); assertEquals(1, dn.getAllBpOs().size()); cluster.addNameNode(conf, nnPort2); assertEquals(2, dn.getAllBpOs().size()); cluster.addNameNode(conf, nnPort3); assertEquals(3, dn.getAllBpOs().size()); cluster.addNameNode(conf, nnPort4); // Ensure a BPOfferService in the datanodes corresponds to // a namenode in the cluster Set<InetSocketAddress> nnAddrsFromCluster = Sets.newHashSet(); for (int i = 0; i < 4; i++) { assertTrue(nnAddrsFromCluster.add( cluster.getNameNode(i).getNameNodeAddress())); } Set<InetSocketAddress> nnAddrsFromDN = Sets.newHashSet(); for (BPOfferService bpos : dn.getAllBpOs()) { for (BPServiceActor bpsa : bpos.getBPServiceActors()) { assertTrue(nnAddrsFromDN.add(bpsa.getNNSocketAddress())); } } assertEquals("", Joiner.on(",").join( Sets.symmetricDifference(nnAddrsFromCluster, nnAddrsFromDN))); } finally { if (cluster != null) { cluster.shutdown(); } } }
/** * Test a cluster with even distribution, then a new empty node is added to * the cluster. Test start a cluster with specified number of nodes, and fills * it to be 30% full (with a single file replicated identically to all * datanodes); It then adds one new empty node and starts balancing. */ @Test(timeout = 60000) public void testBalancerWithHANameNodes() throws Exception { Configuration conf = new HdfsConfiguration(); TestBalancer.initConf(conf); long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity String newNodeRack = TestBalancer.RACK2; // new node's rack // array of racks for original nodes in cluster String[] racks = new String[] { TestBalancer.RACK0, TestBalancer.RACK1 }; // array of capacities of original nodes in cluster long[] capacities = new long[] { TestBalancer.CAPACITY, TestBalancer.CAPACITY }; assertEquals(capacities.length, racks.length); int numOfDatanodes = capacities.length; NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1"); nn1Conf.setIpcPort(NameNode.DEFAULT_PORT); Configuration copiedConf = new Configuration(conf); cluster = new MiniDFSCluster.Builder(copiedConf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(capacities.length) .racks(racks) .simulatedCapacities(capacities) .build(); HATestUtil.setFailoverConfigurations(cluster, conf); try { cluster.waitActive(); cluster.transitionToActive(1); Thread.sleep(500); client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), ClientProtocol.class).getProxy(); long totalCapacity = TestBalancer.sum(capacities); // fill up the cluster to be 30% full long totalUsedSpace = totalCapacity * 3 / 10; TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace / numOfDatanodes, (short) numOfDatanodes, 1); // start up an empty node with the same capacity and on the same rack cluster.startDataNodes(conf, 1, true, null, new String[] { newNodeRack }, new long[] { newNodeCapacity }); totalCapacity += newNodeCapacity; TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); assertEquals(1, namenodes.size()); assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster))); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf); assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, cluster); } finally { cluster.shutdown(); } }