private void runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity, Balancer.Parameters p, int excludedNodes) throws Exception { waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); // start rebalancing Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); final int r = runBalancer(namenodes, p, conf); if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) { assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); return; } else { assertEquals(ExitStatus.SUCCESS.getExitCode(), r); } waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);" ."); waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes); }
/** * Test a cluster with even distribution, * then three nodes are added to the cluster, * runs balancer with two of the nodes in the exclude list */ @Test(timeout=100000) public void testBalancerWithExcludeList() throws Exception { final Configuration conf = new HdfsConfiguration(); initConf(conf); Set<String> excludeHosts = new HashSet<String>(); excludeHosts.add( "datanodeY"); excludeHosts.add( "datanodeZ"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), false, false); }
/** * Test a cluster with even distribution, * then three nodes are added to the cluster, * runs balancer with two of the nodes in the exclude list */ @Test(timeout=100000) public void testBalancerCliWithExcludeList() throws Exception { final Configuration conf = new HdfsConfiguration(); initConf(conf); Set<String> excludeHosts = new HashSet<String>(); excludeHosts.add( "datanodeY"); excludeHosts.add( "datanodeZ"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), true, false); }
/** * Test a cluster with even distribution, * then three nodes are added to the cluster, * runs balancer with two of the nodes in the exclude list in a file */ @Test(timeout=100000) public void testBalancerCliWithExcludeListInAFile() throws Exception { final Configuration conf = new HdfsConfiguration(); initConf(conf); Set<String> excludeHosts = new HashSet<String>(); excludeHosts.add( "datanodeY"); excludeHosts.add( "datanodeZ"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), true, true); }
/** * Test a cluster with even distribution, * then three nodes are added to the cluster, * runs balancer with two of the nodes in the include list */ @Test(timeout=100000) public void testBalancerWithIncludeList() throws Exception { final Configuration conf = new HdfsConfiguration(); initConf(conf); Set<String> includeHosts = new HashSet<String>(); includeHosts.add( "datanodeY"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, Parameters.DEFAULT.nodesToBeExcluded, includeHosts), false, false); }
/** * Test a cluster with even distribution, * then three nodes are added to the cluster, * runs balancer with two of the nodes in the include list */ @Test(timeout=100000) public void testBalancerCliWithIncludeList() throws Exception { final Configuration conf = new HdfsConfiguration(); initConf(conf); Set<String> includeHosts = new HashSet<String>(); includeHosts.add( "datanodeY"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, false); }
/** * Test a cluster with even distribution, * then three nodes are added to the cluster, * runs balancer with two of the nodes in the include list */ @Test(timeout=100000) public void testBalancerCliWithIncludeListInAFile() throws Exception { final Configuration conf = new HdfsConfiguration(); initConf(conf); Set<String> includeHosts = new HashSet<String>(); includeHosts.add( "datanodeY"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, true); }
/** * Make sure that balancer can't move pinned blocks. * If specified favoredNodes when create file, blocks will be pinned use * sticky bit. * @throws Exception */ @Test(timeout=100000) public void testBalancerWithPinnedBlocks() throws Exception { // This test assumes stick-bit based block pin mechanism available only // in Linux/Unix. It can be unblocked on Windows when HDFS-7759 is ready to // provide a different mechanism for Windows. assumeTrue(!Path.WINDOWS); final Configuration conf = new HdfsConfiguration(); initConf(conf); conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true); long[] capacities = new long[] { CAPACITY, CAPACITY }; String[] racks = { RACK0, RACK1 }; int numOfDatanodes = capacities.length; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) .hosts(new String[]{"localhost", "localhost"}) .racks(racks).simulatedCapacities(capacities).build(); try { cluster.waitActive(); client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); // fill up the cluster to be 80% full long totalCapacity = sum(capacities); long totalUsedSpace = totalCapacity * 8 / 10; InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes]; for (int i = 0; i < favoredNodes.length; i++) { favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress(); } DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024, totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE, (short) numOfDatanodes, 0, false, favoredNodes); // start up an empty node with the same capacity cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 }, new long[] { CAPACITY }); totalCapacity += CAPACITY; // run balancer and validate results waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); // start rebalancing Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); int r =, Balancer.Parameters.DEFAULT, conf); assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); } finally { cluster.shutdown(); } }
/** * Wait until balanced: each datanode gives utilization within * BALANCE_ALLOWED_VARIANCE of average * @throws IOException * @throws TimeoutException */ static void waitForBalancer(long totalUsedSpace, long totalCapacity, ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p, int expectedExcludedNodes) throws IOException, TimeoutException { long timeout = TIMEOUT; long failtime = (timeout <= 0L) ? Long.MAX_VALUE : Time.monotonicNow() + timeout; if (!p.nodesToBeIncluded.isEmpty()) { totalCapacity = p.nodesToBeIncluded.size() * CAPACITY; } if (!p.nodesToBeExcluded.isEmpty()) { totalCapacity -= p.nodesToBeExcluded.size() * CAPACITY; } final double avgUtilization = ((double)totalUsedSpace) / totalCapacity; boolean balanced; do { DatanodeInfo[] datanodeReport = client.getDatanodeReport(DatanodeReportType.ALL); assertEquals(datanodeReport.length, cluster.getDataNodes().size()); balanced = true; int actualExcludedNodeCount = 0; for (DatanodeInfo datanode : datanodeReport) { double nodeUtilization = ((double)datanode.getDfsUsed()) / datanode.getCapacity(); if (Dispatcher.Util.isExcluded(p.nodesToBeExcluded, datanode)) { assertTrue(nodeUtilization == 0); actualExcludedNodeCount++; continue; } if (!Dispatcher.Util.isIncluded(p.nodesToBeIncluded, datanode)) { assertTrue(nodeUtilization == 0); actualExcludedNodeCount++; continue; } if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) { balanced = false; if (Time.monotonicNow() > failtime) { throw new TimeoutException( "Rebalancing expected avg utilization to become " + avgUtilization + ", but on datanode " + datanode + " it remains at " + nodeUtilization + " after more than " + TIMEOUT + " msec."); } try { Thread.sleep(100); } catch (InterruptedException ignored) { } break; } } assertEquals(expectedExcludedNodes,actualExcludedNodeCount); } while (!balanced); }
private void runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity) throws Exception { runBalancer(conf, totalUsedSpace, totalCapacity, Balancer.Parameters.DEFAULT, 0); }
@Test(timeout=100000) public void testUnknownDatanode() throws Exception { Configuration conf = new HdfsConfiguration(); initConf(conf); long distribution[] = new long[] {50*CAPACITY/100, 70*CAPACITY/100, 0*CAPACITY/100}; long capacities[] = new long[]{CAPACITY, CAPACITY, CAPACITY}; String racks[] = new String[] {RACK0, RACK1, RACK1}; int numDatanodes = distribution.length; if (capacities.length != numDatanodes || racks.length != numDatanodes) { throw new IllegalArgumentException("Array length is not the same"); } // calculate total space that need to be filled final long totalUsedSpace = sum(distribution); // fill the cluster ExtendedBlock[] blocks = generateBlocks(conf, totalUsedSpace, (short) numDatanodes); // redistribute blocks Block[][] blocksDN = distributeBlocks( blocks, (short)(numDatanodes-1), distribution); // restart the cluster: do NOT format the cluster conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f"); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes) .format(false) .racks(racks) .simulatedCapacities(capacities) .build(); try { cluster.waitActive(); client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); for(int i = 0; i < 3; i++) { cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null); } cluster.startDataNodes(conf, 1, true, null, new String[]{RACK0}, null,new long[]{CAPACITY}); cluster.triggerHeartbeats(); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Set<String> datanodes = new HashSet<String>(); datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName()); Balancer.Parameters p = new Balancer.Parameters( Balancer.Parameters.DEFAULT.policy, Balancer.Parameters.DEFAULT.threshold, Balancer.Parameters.DEFAULT.maxIdleIteration, datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded); final int r =, p, conf); assertEquals(ExitStatus.SUCCESS.getExitCode(), r); } finally { cluster.shutdown(); } }
/** * Test special case. Two replicas belong to same block should not in same node. * We have 2 nodes. * We have a block in (DN0,SSD) and (DN1,DISK). * Replica in (DN0,SSD) should not be moved to (DN1,SSD). * Otherwise DN1 has 2 replicas. */ @Test(timeout=100000) public void testTwoReplicaShouldNotInSameDN() throws Exception { final Configuration conf = new HdfsConfiguration(); int blockSize = 5 * 1024 * 1024 ; conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L); int numOfDatanodes =2; final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(2) .racks(new String[]{"/default/rack0", "/default/rack0"}) .storagesPerDatanode(2) .storageTypes(new StorageType[][]{ {StorageType.SSD, StorageType.DISK}, {StorageType.SSD, StorageType.DISK}}) .storageCapacities(new long[][]{ {100 * blockSize, 20 * blockSize}, {20 * blockSize, 100 * blockSize}}) .build(); try { cluster.waitActive(); //set "/bar" directory with ONE_SSD storage policy. DistributedFileSystem fs = cluster.getFileSystem(); Path barDir = new Path("/bar"); fs.mkdir(barDir,new FsPermission((short)777)); fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME); // Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full, // and (DN0,SSD) and (DN1,DISK) are about 15% full. long fileLen = 30 * blockSize; // fooFile has ONE_SSD policy. So // (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block. // (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block. Path fooFile = new Path(barDir, "foo"); createFile(cluster, fooFile, fileLen, (short) numOfDatanodes, 0); // update space info cluster.triggerHeartbeats(); Balancer.Parameters p = Balancer.Parameters.DEFAULT; Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); final int r =, p, conf); // Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK) // already has one. Otherwise DN1 will have 2 replicas. // For same reason, no replicas were moved. assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); } finally { cluster.shutdown(); } }
/** * Wait until balanced: each datanode gives utilization within * BALANCE_ALLOWED_VARIANCE of average * @throws IOException * @throws TimeoutException */ static void waitForBalancer(long totalUsedSpace, long totalCapacity, ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p, int expectedExcludedNodes) throws IOException, TimeoutException { long timeout = TIMEOUT; long failtime = (timeout <= 0L) ? Long.MAX_VALUE : + timeout; if (!p.nodesToBeIncluded.isEmpty()) { totalCapacity = p.nodesToBeIncluded.size() * CAPACITY; } if (!p.nodesToBeExcluded.isEmpty()) { totalCapacity -= p.nodesToBeExcluded.size() * CAPACITY; } final double avgUtilization = ((double)totalUsedSpace) / totalCapacity; boolean balanced; do { DatanodeInfo[] datanodeReport = client.getDatanodeReport(DatanodeReportType.ALL); assertEquals(datanodeReport.length, cluster.getDataNodes().size()); balanced = true; int actualExcludedNodeCount = 0; for (DatanodeInfo datanode : datanodeReport) { double nodeUtilization = ((double)datanode.getDfsUsed()) / datanode.getCapacity(); if (Dispatcher.Util.isExcluded(p.nodesToBeExcluded, datanode)) { assertTrue(nodeUtilization == 0); actualExcludedNodeCount++; continue; } if (!Dispatcher.Util.isIncluded(p.nodesToBeIncluded, datanode)) { assertTrue(nodeUtilization == 0); actualExcludedNodeCount++; continue; } if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) { balanced = false; if ( > failtime) { throw new TimeoutException( "Rebalancing expected avg utilization to become " + avgUtilization + ", but on datanode " + datanode + " it remains at " + nodeUtilization + " after more than " + TIMEOUT + " msec."); } try { Thread.sleep(100); } catch (InterruptedException ignored) { } break; } } assertEquals(expectedExcludedNodes,actualExcludedNodeCount); } while (!balanced); }