private void waitForDNHeartbeat(DataNode dn, long timeoutMillis, int nnIndex) throws IOException, InterruptedException { InetSocketAddress addr = new InetSocketAddress("localhost", getNameNodePort(nnIndex)); DFSClient client = new DFSClient(addr, nameNodes[nnIndex].conf); int namespaceId = getNameNode(nnIndex).getNamespaceID(); long startTime = System.currentTimeMillis(); while (System.currentTimeMillis() < startTime + timeoutMillis) { DatanodeInfo report[] = client.datanodeReport(DatanodeReportType.LIVE); for (DatanodeInfo thisReport : report) { if (thisReport.getStorageID().equals( dn.getDNRegistrationForNS(namespaceId).getStorageID())) { if (thisReport.getLastUpdate() > startTime) return; } } Thread.sleep(500); } }
@Test public void testReportingNodesDNShutdown() throws Exception { FSNamesystem namesystem = cluster.getNameNode().namesystem; waitForNodesReporting(3, namesystem); cluster.shutdownDataNode(0, false); int live = namesystem.getDatanodeListForReport(DatanodeReportType.LIVE) .size(); long start = System.currentTimeMillis(); while (live != 2 && System.currentTimeMillis() - start < 30000) { live = namesystem.getDatanodeListForReport(DatanodeReportType.LIVE) .size(); System.out.println("Waiting for live : " + live); Thread.sleep(1000); } assertEquals(2, live); waitForNodesReporting(2, namesystem); cluster.restartDataNode(0); waitForNodesReporting(3, namesystem); }
/** When function exits then cluster is balanced (no other guarantees, might loop forever) */ private void assertBalanced(long totalUsedSpace, long totalCapacity) throws Exception { waitForHeartBeat(totalUsedSpace, totalCapacity); boolean balanced; do { DatanodeInfo[] datanodeReport = client.getDatanodeReport(DatanodeReportType.ALL); assertEquals(datanodeReport.length, cluster.getDataNodes().size()); balanced = true; double avgUtilization = ((double)totalUsedSpace)/totalCapacity*100; for(DatanodeInfo datanode:datanodeReport) { double util = ((double) datanode.getDfsUsed()) / datanode.getCapacity() * 100; if (Math.abs(avgUtilization - util) > 10 || util > 99) { balanced = false; DFSTestUtil.waitNMilliSecond(100); break; } } } while(!balanced); }
private void assertNotBalanced(long totalUsedSpace, long totalCapacity, long[] expectedUtilizations) throws Exception { waitForHeartBeat(totalUsedSpace, totalCapacity); DatanodeInfo[] datanodeReport = client.getDatanodeReport(DatanodeReportType.ALL); long[] utilizations = new long[expectedUtilizations.length]; int i = 0; for (DatanodeInfo datanode : datanodeReport) { totalUsedSpace -= datanode.getDfsUsed(); totalCapacity -= datanode.getCapacity(); utilizations[i++] = datanode.getDfsUsed(); } assertEquals(0, totalUsedSpace); assertEquals(0, totalCapacity); assertEquals(expectedUtilizations.length, utilizations.length); Arrays.sort(expectedUtilizations); Arrays.sort(utilizations); assertTrue(Arrays.equals(expectedUtilizations, utilizations)); }
private void setupCluster(Configuration conf, String[] racks, String[] hosts) throws IOException, InterruptedException { // start the cluster with one datanode this.conf = conf; cluster = new MiniDFSCluster(conf, hosts.length, true, racks, hosts); cluster.waitActive(); fs = cluster.getFileSystem(); placementMonitor = new PlacementMonitor(conf); placementMonitor.start(); blockMover = placementMonitor.blockMover; namenode = cluster.getNameNode(); datanodes = namenode.getDatanodeReport(DatanodeReportType.LIVE); // Wait for Livenodes in clusterInfo to be non-null long sTime = System.currentTimeMillis(); while (System.currentTimeMillis() - sTime < 120000 && blockMover.cluster.liveNodes == null) { LOG.info("Waiting for cluster info to add all liveNodes"); Thread.sleep(1000); } }
private String decommissionOneNode() throws IOException { DFSClient client = ((DistributedFileSystem)fileSys).getClient(); DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); int index = 0; boolean found = false; while (!found) { index = rand.nextInt(info.length); if (!info[index].isDecommissioned() && !info[index].isDecommissionInProgress()) { found = true; } } String nodename = info[index].getName(); System.out.println("Decommissioning node: " + nodename); // write nodename into the exclude file. decommissionedNodes.add(nodename); writeExcludesFileAndRefresh(decommissionedNodes); return nodename; }
@Test public void testDatanodeStartupDuringFailover() throws Exception { setUp(false, "testDatanodeStartupDuringFailover"); cluster.killPrimary(); cluster.restartDataNodes(false); long start = System.currentTimeMillis(); int live = 0; int total = 3; while (System.currentTimeMillis() - start < 30000 && live != total) { live = cluster.getStandbyAvatar(0).avatar .getDatanodeReport(DatanodeReportType.LIVE).length; total = cluster.getStandbyAvatar(0).avatar .getDatanodeReport(DatanodeReportType.ALL).length; } assertEquals(total, live); }
@Test public void testDeadDatanodeFailover() throws Exception { setUp(false, "testDeadDatanodeFailover"); h.setIgnoreDatanodes(false); // Create test files. createTestFiles("/testDeadDatanodeFailover"); cluster.shutDownDataNode(0); FSNamesystem ns = cluster.getStandbyAvatar(0).avatar.namesystem; StandbySafeMode safeMode = cluster.getStandbyAvatar(0).avatar.getStandbySafeMode(); new ExitSafeMode(safeMode, ns).start(); cluster.failOver(); // One datanode should be removed after failover assertEquals(2, cluster.getPrimaryAvatar(0).avatar.namesystem .datanodeReport(DatanodeReportType.LIVE).length); assertTrue(pass); }
public void waitForHeartbeats() throws Exception { DatanodeInfo[] dns = cluster.getPrimaryAvatar(0).avatar .getDatanodeReport(DatanodeReportType.ALL); while (true) { int count = 0; for (DatanodeInfo dn : dns) { if (dn.getRemaining() == 5 * MAX_FILE_SIZE || dn.getRemaining() == 0) { LOG.info("Bad dn : " + dn.getName() + " remaining : " + dn.getRemaining()); count++; } } dns = cluster.getPrimaryAvatar(0).avatar .getDatanodeReport(DatanodeReportType.ALL); if (count == 1) break; LOG.info("Waiting for heartbeats"); Thread.sleep(1000); } }
@Test public void testDatanodeNoService() throws Exception { cluster.shutDownDataNodes(); cluster.killStandby(); cluster.restartStandby(); InjectionHandler.set(new TestHandler()); cluster.restartDataNodes(false); // Wait for trigger. while (!done) { System.out.println("Waiting for trigger"); Thread.sleep(1000); } int dnReports = cluster.getStandbyAvatar(0).avatar .getDatanodeReport(DatanodeReportType.LIVE).length; long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < 30000 && dnReports != 1) { System.out.println("Waiting for dn report"); Thread.sleep(1000); dnReports = cluster.getStandbyAvatar(0).avatar .getDatanodeReport(DatanodeReportType.LIVE).length; } assertEquals(1, dnReports); assertTrue(pass); assertTrue(done); }
/** Test when standby registration throws IncorrectVersion */ @Test public void testDatanodeVersionStandby() throws Exception { InjectionHandler.set(new TestHandler(2)); cluster.startDataNodes(1, null, null, conf); waitForDone(); int dnReports = cluster.getPrimaryAvatar(0).avatar .getDatanodeReport(DatanodeReportType.LIVE).length; int dnStandbyReports = cluster.getStandbyAvatar(0).avatar .getDatanodeReport(DatanodeReportType.LIVE).length; long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < 10000 && dnReports != 1) { System.out.println("Waiting for dn report"); DFSTestUtil.waitSecond(); dnReports = cluster.getPrimaryAvatar(0).avatar .getDatanodeReport(DatanodeReportType.LIVE).length; dnStandbyReports = cluster.getStandbyAvatar(0).avatar .getDatanodeReport(DatanodeReportType.LIVE).length; } assertEquals(1, dnReports); assertEquals(0, dnStandbyReports); assertEquals(1, cluster.getDataNodes().size()); assertTrue(cluster.getDataNodes().get(0).isDatanodeUp()); }
@Test public void testDatanodeVersionPrimary() throws Exception { InjectionHandler.set(new TestHandler(1)); cluster.startDataNodes(1, null, null, conf); waitForDone(); int dnReports = cluster.getPrimaryAvatar(0).avatar .getDatanodeReport(DatanodeReportType.LIVE).length; int dnStandbyReports = cluster.getStandbyAvatar(0).avatar .getDatanodeReport(DatanodeReportType.LIVE).length; long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < 10000) { System.out.println("Waiting for dn report"); DFSTestUtil.waitSecond();; dnReports = cluster.getPrimaryAvatar(0).avatar .getDatanodeReport(DatanodeReportType.LIVE).length; dnStandbyReports = cluster.getStandbyAvatar(0).avatar .getDatanodeReport(DatanodeReportType.LIVE).length; } assertEquals(0, dnReports); assertEquals(1, dnStandbyReports); assertEquals(1, cluster.getDataNodes().size()); assertFalse(cluster.getDataNodes().get(0).isDatanodeUp()); }
/** * Wait until the cluster is active and running. */ public void waitActive() throws IOException { if (nameNode == null) { return; } InetSocketAddress addr = NetUtils.makeSocketAddr("localhost", getNameNodePort()); DFSClient client = new DFSClient(addr, conf); // make sure all datanodes have registered and sent heartbeat while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE))) { try { Thread.sleep(100); } catch (InterruptedException e) { } } client.close(); System.out.println("Cluster is active"); }
/** * Wait for the given datanode to heartbeat once. */ public void waitForDNHeartbeat(int dnIndex, long timeoutMillis) throws IOException, InterruptedException { DataNode dn = getDataNodes().get(dnIndex); InetSocketAddress addr = new InetSocketAddress("localhost", getNameNodePort()); DFSClient client = new DFSClient(addr, conf); long startTime = System.currentTimeMillis(); while (System.currentTimeMillis() < startTime + timeoutMillis) { DatanodeInfo report[] = client.datanodeReport(DatanodeReportType.LIVE); for (DatanodeInfo thisReport : report) { if (thisReport.getStorageID().equals( dn.dnRegistration.getStorageID())) { if (thisReport.getLastUpdate() > startTime) return; } } Thread.sleep(500); } }
/** * Wait until the cluster is active and running. */ public void waitActive() throws IOException { if (nameNode == null) { return; } InetSocketAddress addr = new InetSocketAddress("localhost", getNameNodePort()); DFSClient client = new DFSClient(addr, conf); // make sure all datanodes have registered and sent heartbeat while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE))) { try { Thread.sleep(100); } catch (InterruptedException e) { } } client.close(); }
/** * Choose a datanode (hostname:portnumber). The datanode is chosen at random * from the live datanodes. * * @param locationsToAvoid * locations to avoid. * @return A string in the format name:port. * @throws IOException */ private String chooseDatanode(DatanodeInfo[] locationsToAvoid) throws IOException { DistributedFileSystem dfs = getDFS(new Path("/")); DatanodeInfo[] live = dfs.getClient().datanodeReport( DatanodeReportType.LIVE); Random rand = new Random(); String chosen = null; int maxAttempts = 1000; for (int i = 0; i < maxAttempts && chosen == null; i++) { int idx = rand.nextInt(live.length); chosen = live[idx].name; for (DatanodeInfo avoid : locationsToAvoid) { if (chosen.equals(avoid.name)) { //LOG.info("Avoiding " + avoid.name); chosen = null; break; } } } if (chosen == null) { throw new IOException("Could not choose datanode"); } return chosen; }
private DatanodeInfo chooseDatanodeInfo(DatanodeInfo[] locationsToAvoid) throws IOException { DistributedFileSystem dfs = getDFS(new Path("/")); DatanodeInfo[] live = dfs.getClient().datanodeReport( DatanodeReportType.LIVE); Random rand = new Random(); DatanodeInfo chosen = null; int maxAttempts = 1000; for (int i = 0; i < maxAttempts && chosen == null; i++) { int idx = rand.nextInt(live.length); chosen = live[idx]; for (DatanodeInfo avoid : locationsToAvoid) { if (chosen.name.equals(avoid.name)) { chosen = null; break; } } } if (chosen == null) { throw new IOException("Could not choose datanode"); } return chosen; }
/** * Triggers failover processing for safe mode and blocks until we have left * safe mode. * * @throws IOException */ protected void triggerFailover() throws IOException { clearDataStructures(); for (DatanodeInfo node : namesystem.datanodeReport(DatanodeReportType.LIVE)) { liveDatanodes.add(node); outStandingHeartbeats.add(node); } safeModeState = SafeModeState.FAILOVER_IN_PROGRESS; safeModeMonitor = new Daemon(new SafeModeMonitor(namesystem, this)); safeModeMonitor.start(); try { safeModeMonitor.join(); } catch (InterruptedException ie) { throw new IOException("triggerSafeMode() interruped()"); } if (safeModeState != SafeModeState.AFTER_FAILOVER) { throw new RuntimeException("safeModeState is : " + safeModeState + " which does not indicate a successfull exit of safemode"); } }
@Test public void testDatanodeStartupDuringFailover() throws Exception { setUp(false); cluster.killPrimary(); cluster.restartDataNodes(false); long start = System.currentTimeMillis(); int live = 0; int total = 3; while (System.currentTimeMillis() - start < 30000 && live != total) { live = cluster.getStandbyAvatar(0).avatar .getDatanodeReport(DatanodeReportType.LIVE).length; total = cluster.getStandbyAvatar(0).avatar .getDatanodeReport(DatanodeReportType.ALL).length; } assertEquals(total, live); }
@Test public void testDeadDatanodeFailover() throws Exception { setUp(false); h.setIgnoreDatanodes(false); // Create test files. createTestFiles("/testDeadDatanodeFailover"); cluster.shutDownDataNode(0); FSNamesystem ns = cluster.getStandbyAvatar(0).avatar.namesystem; StandbySafeMode safeMode = cluster.getStandbyAvatar(0).avatar.getStandbySafeMode(); new ExitSafeMode(safeMode, ns).start(); cluster.failOver(); // One datanode should be removed after failover assertEquals(2, cluster.getPrimaryAvatar(0).avatar.namesystem .datanodeReport(DatanodeReportType.LIVE).length); assertTrue(pass); }
/** * Wait until the cluster is active and running. */ public void waitActive() throws IOException { if (nameNode == null) { return; } InetSocketAddress addr = new InetSocketAddress("localhost", getNameNodePort()); DFSClient client = new DFSClient(addr, conf); // make sure all datanodes are alive while(client.datanodeReport(DatanodeReportType.LIVE).length != numDataNodes) { try { Thread.sleep(500); } catch (Exception e) { } } client.close(); }
@Test public void testDatanodeBlockScanner() throws IOException { long startTime = System.currentTimeMillis(); Configuration conf = new Configuration(); cluster = new MiniDFSCluster(conf, 1, true, null); cluster.waitActive(); fs = cluster.getFileSystem(); Path file1 = new Path("/tmp/testBlockVerification/file1"); Path file2 = new Path("/tmp/testBlockVerification/file2"); /* * Write the first file and restart the cluster. */ DFSTestUtil.createFile(fs, file1, 10, (short)1, 0); cluster.shutdown(); cluster = new MiniDFSCluster(conf, 1, false, null); cluster.waitActive(); DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), conf); fs = cluster.getFileSystem(); DatanodeInfo dn = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0]; /* * The cluster restarted. The block should be verified by now. */ assertTrue(waitForVerification(dn, fs, file1) > startTime); /* * Create a new file and read the block. The block should be marked * verified since the client reads the block and verifies checksum. */ DFSTestUtil.createFile(fs, file2, 10, (short)1, 0); IOUtils.copyBytes(fs.open(file2), new IOUtils.NullOutputStream(), conf, true); assertTrue(waitForVerification(dn, fs, file2) > startTime); }
@Test public void testDeadDatanode() throws Exception { setUp(3); String fileName = "/test"; Path file = new Path(fileName); DFSTestUtil.createFile(fs, file, BLOCKS, (short) 3, 0); DFSInputStream in = fs.dfs.open(fileName); // 1 block fetched by default during open. assertEquals(1, in.fetchLocatedBlocks().locatedBlockCount()); // 3 locations in client cache. assertEquals(3, in.fetchLocatedBlocks().getLocatedBlocks().get(0) .getLocations().length); int live = cluster.getNameNode().getDatanodeReport(DatanodeReportType.LIVE).length; assertEquals(3, live); cluster.shutdownDataNode(0, false); // Wait for datanode to expire. long start = System.currentTimeMillis(); while (live != 2 && System.currentTimeMillis() - start < 30000) { Thread.sleep(1000); live = cluster.getNameNode().getDatanodeReport(DatanodeReportType.LIVE).length; } assertEquals(2, live); blockRenewalDone = 0; waitForBlockRenewal(); // Dead datanode removed from client cache. assertEquals(2, in.fetchLocatedBlocks().getLocatedBlocks().get(0) .getLocations().length); }
private DatanodeInfo getDataNodeInfo(int nodeIndex) throws IOException { for (DatanodeInfo dataNodeInfo : nameNode.getDatanodeReport(DatanodeReportType.ALL)) { if (dataNodeInfo.getName().equals(favoredHosts[nodeIndex])) { return dataNodeInfo; } } return null; }
@Test public void testStartup() throws IOException { conf = new Configuration(); conf.setClass("dfs.block.replicator.classname", BlockPlacementPolicyConfigurable.class, BlockPlacementPolicy.class); conf.set(FSConstants.DFS_HOSTS, "hosts"); cluster = new MiniDFSCluster(conf, 3, new String[] { "/r1", "/r2", NetworkTopology.DEFAULT_RACK }, null, true, false); DFSTestUtil util = new DFSTestUtil("/testStartup", 10, 10, 1024); util.createFiles(cluster.getFileSystem(), "/"); util.checkFiles(cluster.getFileSystem(), "/"); assertEquals(2, cluster.getNameNode().getDatanodeReport(DatanodeReportType.LIVE).length); cluster.shutdown(); }
private String decommissionNode(FSNamesystem namesystem, Configuration conf, DFSClient client, FileSystem localFileSys, int nodeIndex) throws IOException { DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); String nodename = info[nodeIndex].getName(); System.out.println("Decommissioning node: " + nodename); // write nodename into the exclude file. ArrayList<String> nodes = new ArrayList<String>(decommissionedNodes); nodes.add(nodename); writeConfigFile(localFileSys, excludeFile, nodes); namesystem.refreshNodes(conf); return nodename; }
private void runBenchmark(String testname) { BlockPlacementPolicy policy = cluster.getNameNode().namesystem.replicator; Random r = new Random(); ArrayList <DatanodeDescriptor> dns = cluster.getNameNode().namesystem .getDatanodeListForReport(DatanodeReportType.ALL); long start = System.currentTimeMillis(); for (long i = 0; i < totalRuns; i++) { policy.chooseTarget("", 3, dns.get(r.nextInt(dns.size())), BLOCK_SIZE); } System.out.println("TOTAL TIME FOR " + totalRuns + " runs : of " + testname + " : " + (System.currentTimeMillis() - start)); }
/** * This tests that the over replicated blocks number is consistent after a datanode * goes into decomission and comes back without going down. */ @Test public void testDecommisionExcessBlocks() throws Exception { conf = new Configuration(); conf.setInt("dfs.block.size", 1024); cluster = new MiniDFSCluster(conf, 3, true, null); try { DFSTestUtil.createFile(cluster.getFileSystem(), new Path("/abc"), (long) 1024 * 10, (short) 3, 0); cluster.startDataNodes(conf, 1, true, null, null); FSNamesystem namesystem = cluster.getNameNode().namesystem; DatanodeDescriptor dn = null; for (DatanodeDescriptor dnn : namesystem.getDatanodeListForReport( DatanodeReportType.LIVE)) { if (dnn.numBlocks() != 0) { dn = dnn; break; } } assertNotNull(dn); namesystem.startDecommission(dn); waitForReplication(3); namesystem.stopDecommission(dn); waitForReplication(4); assertEquals(10, namesystem.overReplicatedBlocks.size()); } finally { cluster.shutdown(); } }
private DatanodeInfo decommissionNode(int nnIndex, ArrayList<DatanodeInfo>decommissionedNodes, AdminStates waitForState) throws IOException { DFSClient client = getDfsClient(cluster.getNameNode(nnIndex), conf); DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); // // pick one datanode randomly. // int index = 0; boolean found = false; while (!found) { index = myrand.nextInt(info.length); if (!info[index].isDecommissioned()) { found = true; } } String nodename = info[index].getName(); LOG.info("Decommissioning node: " + nodename); // write nodename into the exclude file. ArrayList<String> nodes = new ArrayList<String>(); if (decommissionedNodes != null) { for (DatanodeInfo dn : decommissionedNodes) { nodes.add(dn.getName()); } } nodes.add(nodename); writeConfigFile(excludeFile, nodes); cluster.getNameNode(nnIndex).namesystem.refreshNodes(conf); DatanodeInfo ret = cluster.getNameNode(nnIndex).namesystem.getDatanode(info[index]); waitNodeState(ret, waitForState); return ret; }
private void testDecommission(int numNamenodes, int numDatanodes, boolean federation) throws IOException { LOG.info("Starting test testDecommission"); startCluster(numNamenodes, numDatanodes, conf, federation); ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = new ArrayList<ArrayList<DatanodeInfo>>(numNamenodes); for(int i = 0; i < numNamenodes; i++) { namenodeDecomList.add(i, new ArrayList<DatanodeInfo>(numDatanodes)); } Path file1 = new Path("testDecommission.dat"); for (int iteration = 0; iteration < numDatanodes - 1; iteration++) { int replicas = numDatanodes - iteration - 1; // Start decommissioning one namenode at a time for (int i = 0; i < numNamenodes; i++) { ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(i); FileSystem fileSys = cluster.getFileSystem(i); writeFile(fileSys, file1, replicas); // Decommission one node. Verify that node is decommissioned. DatanodeInfo decomNode = decommissionNode(i, decommissionedNodes, AdminStates.DECOMMISSIONED); decommissionedNodes.add(decomNode); // Ensure decommissioned datanode is not automatically shutdown DFSClient client = getDfsClient(cluster.getNameNode(i), conf); assertEquals("All datanodes must be alive", numDatanodes, client.datanodeReport(DatanodeReportType.LIVE).length); checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes); cleanupFile(fileSys, file1); } } // Restart the cluster and ensure decommissioned datanodes // are allowed to register with the namenode cluster.shutdown(); startCluster(numNamenodes, numDatanodes, conf, federation); }
public void testHostsFile(int numNameNodes, boolean federation) throws IOException, InterruptedException { conf.set(FSConstants.DFS_HOSTS, hostsFile.toUri().getPath()); int numDatanodes = 1; cluster = new MiniDFSCluster(0, conf, numDatanodes, true, true, true, null, null, null, null, true, true, numNameNodes, federation); cluster.waitActive(); // Now empty hosts file and ensure the datanode is disallowed // from talking to namenode, resulting in it's shutdown. ArrayList<String>list = new ArrayList<String>(); list.add("invalidhost"); writeConfigFile(hostsFile, list); for (int j = 0; j < numNameNodes; j++) { cluster.getNameNode(j).namesystem.refreshNodes(conf); DFSClient client = getDfsClient(cluster.getNameNode(j), conf); DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); for (int i = 0 ; i < 20 && info.length != 0; i++) { LOG.info("Waiting for datanode to be marked dead"); Thread.sleep(HEARTBEAT_INTERVAL * 1000); info = client.datanodeReport(DatanodeReportType.LIVE); } assertEquals("Number of live nodes should be 0", 0, info.length); } }
private DatanodeInfo chooseDatanode(DatanodeInfo[] locationsToAvoid) throws IOException { DistributedFileSystem dfs = getDFS(new Path("/")); DatanodeInfo[] live = dfs.getClient().datanodeReport(DatanodeReportType.LIVE); return chooseDatanode(locationsToAvoid, live); }