public void testBlocksScheduledCounter() throws IOException { MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null); cluster.waitActive(); FileSystem fs = cluster.getFileSystem(); //open a file an write a few bytes: FSDataOutputStream out = fs.create(new Path("/testBlockScheduledCounter")); for (int i=0; i<1024; i++) { out.write(i); } // flush to make sure a block is allocated. ((DFSOutputStream)(out.getWrappedStream())).sync(); ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>(); cluster.getNameNode().namesystem.DFSNodesStatus(dnList, dnList); DatanodeDescriptor dn = dnList.get(0); assertEquals(1, dn.getBlocksScheduled()); // close the file and the counter should go to zero. out.close(); assertEquals(0, dn.getBlocksScheduled()); }
/** * Pushes live/dead datanode metrics, and return the list of live nodes, * so it can be reused. */ private void populateDatanodeMetrics(ArrayList<DatanodeDescriptor> live, ArrayList<DatanodeDescriptor> dead) { DatanodeStatus status = FSNamesystemDatanodeHelper.getDatanodeStats(fsNameSystem, live, dead); // populate metrics numLiveNodes.set(status.numLive); numLiveExcludedNodes.set(status.numLiveExcluded); numLiveDecommissioningInProgressNodes.set(status.numLiveDecommissioningInProgress); numLiveDecommissioned.set(status.numLiveDecommissioned); numDeadNodes.set(status.numDead); numDeadExcludedNodes.set(status.numDeadExcluded); numDeadDecommissioningNotCompletedNodes.set(status.numDeadDecommissioningNotCompleted); numDeadDecommissioned.set(status.numDeadDecommissioned); }
public void testBlocksScheduledCounter() throws IOException { MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()) .build(); cluster.waitActive(); FileSystem fs = cluster.getFileSystem(); //open a file an write a few bytes: FSDataOutputStream out = fs.create(new Path("/testBlockScheduledCounter")); for (int i=0; i<1024; i++) { out.write(i); } // flush to make sure a block is allocated. ((DFSOutputStream)(out.getWrappedStream())).hflush(); ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>(); cluster.getNamesystem().DFSNodesStatus(dnList, dnList); DatanodeDescriptor dn = dnList.get(0); assertEquals(1, dn.getBlocksScheduled()); // close the file and the counter should go to zero. out.close(); assertEquals(0, dn.getBlocksScheduled()); }
public void testPseudoSortByDistance() throws Exception { DatanodeDescriptor[] testNodes = new DatanodeDescriptor[3]; // array contains both local node & local rack node testNodes[0] = dataNodes[1]; testNodes[1] = dataNodes[2]; testNodes[2] = dataNodes[0]; cluster.pseudoSortByDistance(dataNodes[0], testNodes ); assertTrue(testNodes[0] == dataNodes[0]); assertTrue(testNodes[1] == dataNodes[1]); assertTrue(testNodes[2] == dataNodes[2]); // array contains local node testNodes[0] = dataNodes[1]; testNodes[1] = dataNodes[3]; testNodes[2] = dataNodes[0]; cluster.pseudoSortByDistance(dataNodes[0], testNodes ); assertTrue(testNodes[0] == dataNodes[0]); assertTrue(testNodes[1] == dataNodes[1]); assertTrue(testNodes[2] == dataNodes[3]); // array contains local rack node testNodes[0] = dataNodes[5]; testNodes[1] = dataNodes[3]; testNodes[2] = dataNodes[1]; cluster.pseudoSortByDistance(dataNodes[0], testNodes ); assertTrue(testNodes[0] == dataNodes[1]); assertTrue(testNodes[1] == dataNodes[3]); assertTrue(testNodes[2] == dataNodes[5]); }
/** * This picks a large number of nodes at random in order to ensure coverage * * @param numNodes the number of nodes * @param excludedScope the excluded scope * @return the frequency that nodes were chosen */ private Map<Node, Integer> pickNodesAtRandom(int numNodes, String excludedScope) { Map<Node, Integer> frequency = new HashMap<Node, Integer>(); for (DatanodeDescriptor dnd : dataNodes) { frequency.put(dnd, 0); } for (int j = 0; j < numNodes; j++) { Node random = cluster.chooseRandom(excludedScope); frequency.put(random, frequency.get(random) + 1); } return frequency; }
/** * Does a lot of hacks to change namenode and datanode datastructures to * identify datanodes by the machine name rather than the IP address. This is * done since we can give each datanode a different hostname in a unit test * but not a different ip address. * * @param cluster * the {@link MiniDFSCluster} to operate on * @throws Exception */ private static void updateDatanodeMap(MiniDFSCluster cluster) throws Exception { FSNamesystem namesystem = cluster.getNameNode().namesystem; for (DataNode node : cluster.getDataNodes()) { // Get old descriptor. DatanodeID dnId = createDataNodeID(node); DatanodeDescriptor dnDs = namesystem.getDatanode(dnId); // Create new id and descriptor. DatanodeID newId = new DatanodeID(node.getMachineName(), dnDs.getStorageID(), dnDs.getInfoPort(), dnDs.getIpcPort()); DatanodeDescriptor newDS = new DatanodeDescriptor(newId, dnDs.getNetworkLocation(), dnDs.getHostName(), dnDs.getCapacity(), dnDs.getDfsUsed(), dnDs.getRemaining(), dnDs.getNamespaceUsed(), dnDs.getXceiverCount()); newDS.isAlive = true; // Overwrite NN maps with new descriptor. namesystem.writeLock(); namesystem.clusterMap.remove(dnDs); namesystem.resolveNetworkLocation(newDS); namesystem.unprotectedAddDatanode(newDS); namesystem.clusterMap.add(newDS); namesystem.writeUnlock(); // Overwrite DN map with new registration. node.setRegistrationName(node.getMachineName()); } }
private void checkDecommissionStatus(DatanodeDescriptor decommNode, int expectedUnderRep, int expectedDecommissionOnly, int expectedUnderRepInOpenFiles) { assertEquals(decommNode.decommissioningStatus.getUnderReplicatedBlocks(), expectedUnderRep); assertEquals( decommNode.decommissioningStatus.getDecommissionOnlyReplicas(), expectedDecommissionOnly); assertEquals(decommNode.decommissioningStatus .getUnderReplicatedInOpenFiles(), expectedUnderRepInOpenFiles); }
public static long getLiveDatanodeCapacity(FSNamesystem ns) { ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>(); ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>(); ns.DFSNodesStatus(live, dead); long capacity = 0; for (final DatanodeDescriptor dn : live) { capacity += dn.getCapacity(); } return capacity; }
public static void waitForDatanodeStatus(FSNamesystem ns, int expectedLive, int expectedDead, long expectedVolFails, long expectedTotalCapacity, long timeout) throws InterruptedException, TimeoutException { ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>(); ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>(); final int ATTEMPTS = 10; int count = 0; long currTotalCapacity = 0; int volFails = 0; do { Thread.sleep(timeout); live.clear(); dead.clear(); ns.DFSNodesStatus(live, dead); currTotalCapacity = 0; volFails = 0; for (final DatanodeDescriptor dd : live) { currTotalCapacity += dd.getCapacity(); volFails += dd.getVolumeFailures(); } count++; } while ((expectedLive != live.size() || expectedDead != dead.size() || expectedTotalCapacity != currTotalCapacity || expectedVolFails != volFails) && count < ATTEMPTS); if (count == ATTEMPTS) { throw new TimeoutException("Timed out waiting for capacity." + " Live = "+live.size()+" Expected = "+expectedLive + " Dead = "+dead.size()+" Expected = "+expectedDead + " Total capacity = "+currTotalCapacity + " Expected = "+expectedTotalCapacity + " Vol Fails = "+volFails+" Expected = "+expectedVolFails); } }
public void testPipeline() { DatanodeDescriptor[] testNodes = new DatanodeDescriptor[3]; // array contains both local node & local rack node testNodes[0] = dataNodes[1]; testNodes[1] = dataNodes[2]; testNodes[2] = dataNodes[0]; cluster.getPipeline(dataNodes[0], testNodes); assertTrue(testNodes[0] == dataNodes[0]); assertTrue(testNodes[1] == dataNodes[1]); assertTrue(testNodes[2] == dataNodes[2]); // array does not contain local node or local rack node testNodes[0] = dataNodes[5]; testNodes[1] = dataNodes[3]; testNodes[2] = dataNodes[2]; cluster.getPipeline(dataNodes[0], testNodes); assertTrue(testNodes[0] == dataNodes[2] && testNodes[1] == dataNodes[3] || testNodes[0] == dataNodes[3] && testNodes[1] == dataNodes[2]); assertTrue(testNodes[2] == dataNodes[5]); // array contains local rack node testNodes[0] = dataNodes[5]; testNodes[1] = dataNodes[3]; testNodes[2] = dataNodes[1]; cluster.getPipeline(dataNodes[0], testNodes); assertTrue(testNodes[0] == dataNodes[1]); assertTrue(testNodes[1] == dataNodes[3]); assertTrue(testNodes[2] == dataNodes[5]); // two on a different rack, two in a different datacenter testNodes = new DatanodeDescriptor[5]; testNodes[0] = dataNodes[5]; testNodes[1] = dataNodes[3]; testNodes[2] = dataNodes[6]; testNodes[3] = dataNodes[0]; testNodes[4] = dataNodes[2]; cluster.getPipeline(dataNodes[0], testNodes); assertTrue(testNodes[0] == dataNodes[0]); assertTrue(testNodes[1] == dataNodes[2] && testNodes[2] == dataNodes[3] || testNodes[1] == dataNodes[3] && testNodes[2] == dataNodes[2]); assertTrue(testNodes[3] == dataNodes[5] && testNodes[4] == dataNodes[6] || testNodes[3] == dataNodes[6] && testNodes[4] == dataNodes[5]); }
@Test public void testDeadDatanodes() throws Exception { DFSTestUtil util = new DFSTestUtil("testDeadDatanodes", 1, 1, MAX_FILE_SIZE); String topDir = "/testDeadDatanodes"; util.createFiles(fs, topDir); FastCopy fastCopy = new FastCopy(conf); // Find the locations for the last block of the file. String filename = util.getFileNames(topDir)[0]; LocatedBlocks lbks = cluster.getNameNode().getBlockLocations(filename, 0, Long.MAX_VALUE); assertNotNull(lbks); int namespaceID = cluster.getNameNode().getNamespaceID(); DataNode dn = cluster.getDataNodes().get(0); DatanodeID dnId = dn.getDNRegistrationForNS(namespaceID); List <Block> deleteList = new ArrayList <Block> (); for(LocatedBlock block : lbks.getLocatedBlocks()) { deleteList.add(block.getBlock()); } assertEquals(lbks.locatedBlockCount(), dn.getFSDataset().getBlockReport(namespaceID).length); DatanodeDescriptor dnDs = cluster.getNameNode().namesystem.getDatanode(dnId); dnDs.addBlocksToBeInvalidated(deleteList); // Make sure all blocks are deleted. while(dn.getFSDataset().getBlockReport(namespaceID).length != 0) { Thread.sleep(1000); } // Now run FastCopy try { for (String fileName : util.getFileNames(topDir)) { fastCopy.copy(fileName, fileName + "dst", (DistributedFileSystem) fs, (DistributedFileSystem) fs); } } finally { fastCopy.shutdown(); } // Make sure no errors are reported. Map<DatanodeInfo, Integer> dnErrors = fastCopy.getDatanodeErrors(); assertEquals(0, dnErrors.size()); }
/** * Tests Decommissioning Status in DFS. */ @Test public void testDecommissionStatus() throws IOException, InterruptedException { InetSocketAddress addr = new InetSocketAddress("localhost", cluster .getNameNodePort()); DFSClient client = new DFSClient(addr, conf); DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); assertEquals("Number of Datanodes ", 2, info.length); FileSystem fileSys = cluster.getFileSystem(); short replicas = 2; // // Decommission one node. Verify the decommission status // Path file1 = new Path("decommission.dat"); writeFile(fileSys, file1, replicas); Path file2 = new Path("decommission1.dat"); FSDataOutputStream st1 = writeIncompleteFile(fileSys, file2, replicas); Thread.sleep(5000); FSNamesystem fsn = cluster.getNameNode().getNamesystem(); for (int iteration = 0; iteration < numDatanodes; iteration++) { String downnode = decommissionNode(fsn, conf, client, localFileSys, iteration); decommissionedNodes.add(downnode); Thread.sleep(5000); ArrayList<DatanodeDescriptor> decommissioningNodes = fsn .getDecommissioningNodesList(); if (iteration == 0) { assertEquals(decommissioningNodes.size(), 1); DatanodeDescriptor decommNode = decommissioningNodes.get(0); checkDecommissionStatus(decommNode, 4, 0, 2); } else { assertEquals(decommissioningNodes.size(), 2); DatanodeDescriptor decommNode1 = decommissioningNodes.get(0); DatanodeDescriptor decommNode2 = decommissioningNodes.get(1); checkDecommissionStatus(decommNode1, 4, 4, 2); checkDecommissionStatus(decommNode2, 4, 4, 2); } } // Call refreshNodes on FSNamesystem with empty exclude file. // This will remove the datanodes from decommissioning list and // make them available again. writeConfigFile(localFileSys, excludeFile, null); fsn.refreshNodes(conf); st1.close(); cleanupFile(fileSys, file1); cleanupFile(fileSys, file2); cleanupFile(localFileSys, dir); }
public void testNameNodeBehavior() throws IOException, ClassNotFoundException, InterruptedException { setup(2, -1); final int fileLenBlocks = STRIPE_LENGTH; final int repl = 1; // Get set up with datanode references DatanodeInfo[] nodeInfos = namenode.getDatanodeReport(DatanodeReportType.ALL); DatanodeDescriptor[] nodes = new DatanodeDescriptor[nodeInfos.length]; for (int i = 0; i < nodes.length; i++) { nodes[i] = namenode.namesystem.getDatanode(nodeInfos[i]); LOG.info("nodes[" + i + "]=" + nodes[i].getName()); } // Create file with one block on nodes[1] and the rest on nodes[0] Path raidPath = new Path("/raidrs"); Path filePath = new Path("/user/hadoop/testNameNodeBehavior/file"); long[] crc = createRandomFileDispersed(filePath, fileLenBlocks, nodes[0], nodes[1]); FileStatus file = fileSys.getFileStatus(filePath); // Raid the file; parity blocks go on nodes[0] BlockPlacementPolicyFakeData.lastInstance.overridingDatanode = nodes[0]; RaidNode.doRaid(conf, file, raidPath, Codec.getCodec("rs"), new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, repl, repl); Thread.sleep(1000); printFileLocations(file); BlockPlacementPolicyFakeData.lastInstance.overridingDatanode = null; // Now decommission the second node ArrayList<String> decommissioned = new ArrayList<String>(); decommissioned.add(nodes[1].getName()); writeExcludesFileAndRefresh(decommissioned); // Wait for the BlockRegenerator to do its thing long now = System.currentTimeMillis(); BlockIntegrityMonitor bf = raidnode.blockIntegrityMonitor; while ((bf.getNumFilesCopied() == 0) && (bf.getNumFileCopyFailures() == 0) && ((System.currentTimeMillis() - now) < 30000)) { LOG.info("Waiting for the BlockRegenerator to finish... "); Thread.sleep(1000); } // Validate result printFileLocations(file); assertEquals(0, bf.getNumFileCopyFailures()); assertEquals(1, bf.getNumFilesCopied()); // No corrupt block fixing should have happened assertEquals("corrupt block fixer unexpectedly performed fixing", 0, bf.getNumFilesFixed()); assertEquals("corrupt block fixer unexpectedly attempted fixing", 0, bf.getNumFileFixFailures()); validateFileCopy(fileSys, filePath, file.getLen(), crc, false); teardown(); }