/** * Filesystem checker. * @param conf configuration (namenode config) * @param namenode namenode that this fsck is going to use * @param pmap key=value[] map passed to the http servlet as url parameters * @param out output stream to write the fsck output * @param totalDatanodes number of live datanodes * @param remoteAddress source address of the fsck request */ NamenodeFsck(Configuration conf, NameNode namenode, NetworkTopology networktopology, Map<String,String[]> pmap, PrintWriter out, int totalDatanodes, InetAddress remoteAddress) { this.conf = conf; this.namenode = namenode; this.networktopology = networktopology; this.out = out; this.totalDatanodes = totalDatanodes; this.remoteAddress = remoteAddress; this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null, networktopology, namenode.getNamesystem().getBlockManager().getDatanodeManager() .getHost2DatanodeMap()); for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) { String key = it.next(); if (key.equals("path")) { this.path = pmap.get("path")[0]; } else if (key.equals("move")) { this.doMove = true; } else if (key.equals("delete")) { this.doDelete = true; } else if (key.equals("files")) { this.showFiles = true; } else if (key.equals("blocks")) { this.showBlocks = true; } else if (key.equals("locations")) { this.showLocations = true; } else if (key.equals("racks")) { this.showRacks = true; } else if (key.equals("storagepolicies")) { this.showStoragePolcies = true; } else if (key.equals("openforwrite")) {this.showOpenFiles = true; } else if (key.equals("listcorruptfileblocks")) { this.showCorruptFileBlocks = true; } else if (key.equals("startblockafter")) { this.currentCookie[0] = pmap.get("startblockafter")[0]; } else if (key.equals("includeSnapshots")) { this.snapshottableDirs = new ArrayList<String>(); } else if (key.equals("blockId")) { this.blockIds = pmap.get("blockId")[0]; } } }
private static void checkReplicationPolicyCompatibility(Configuration conf ) throws UnsupportedActionException { if (!(BlockPlacementPolicy.getInstance(conf, null, null, null) instanceof BlockPlacementPolicyDefault)) { throw new UnsupportedActionException( "Balancer without BlockPlacementPolicyDefault"); } }
@Before public void setupCluster() throws Exception { conf = new Configuration(); conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, SMALL_BLOCK); // Bump up replication interval so that we only run replication // checks explicitly. conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 600); // Increase max streams so that we re-replicate quickly. conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000); // See RandomDeleterPolicy javadoc. conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, RandomDeleterPolicy.class, BlockPlacementPolicy.class); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(3) .build(); nn1 = cluster.getNameNode(0); nn2 = cluster.getNameNode(1); cluster.waitActive(); cluster.transitionToActive(0); // Trigger block reports so that the first NN trusts all // of the DNs, and will issue deletions cluster.triggerBlockReports(); fs = HATestUtil.configureFailoverFs(cluster, conf); }
private void testDeleteAddBlockRace(boolean hasSnapshot) throws Exception { try { conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, SlowBlockPlacementPolicy.class, BlockPlacementPolicy.class); cluster = new MiniDFSCluster.Builder(conf).build(); FileSystem fs = cluster.getFileSystem(); final String fileName = "/testDeleteAddBlockRace"; Path filePath = new Path(fileName); FSDataOutputStream out = null; out = fs.create(filePath); if (hasSnapshot) { SnapshotTestHelper.createSnapshot((DistributedFileSystem) fs, new Path( "/"), "s1"); } Thread deleteThread = new DeleteThread(fs, filePath); deleteThread.start(); try { // write data and syn to make sure a block is allocated. out.write(new byte[32], 0, 32); out.hsync(); Assert.fail("Should have failed."); } catch (FileNotFoundException e) { GenericTestUtils.assertExceptionContains(filePath.getName(), e); } } finally { if (cluster != null) { cluster.shutdown(); } } }
@Test public void testRenameRace() throws Exception { try { conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, SlowBlockPlacementPolicy.class, BlockPlacementPolicy.class); cluster = new MiniDFSCluster.Builder(conf).build(); FileSystem fs = cluster.getFileSystem(); Path dirPath1 = new Path("/testRenameRace1"); Path dirPath2 = new Path("/testRenameRace2"); Path filePath = new Path("/testRenameRace1/file1"); fs.mkdirs(dirPath1); FSDataOutputStream out = fs.create(filePath); Thread renameThread = new RenameThread(fs, dirPath1, dirPath2); renameThread.start(); // write data and close to make sure a block is allocated. out.write(new byte[32], 0, 32); out.close(); // Restart name node so that it replays edit. If old path was // logged in edit, it will fail to come up. cluster.restartNameNode(0); } finally { if (cluster != null) { cluster.shutdown(); } } }
@Before public void setup() throws IOException { StaticMapping.resetMap(); Configuration conf = new HdfsConfiguration(); final ArrayList<String> rackList = new ArrayList<String>(); final ArrayList<String> hostList = new ArrayList<String>(); for (int i = 0; i < 10; i++) { for (int j = 0; j < 2; j++) { rackList.add("/rack" + i); hostList.add("/host" + i + j); } } conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyRackFaultTolerant.class, BlockPlacementPolicy.class); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2); cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(hostList.size()) .racks(rackList.toArray(new String[rackList.size()])) .hosts(hostList.toArray(new String[hostList.size()])) .build(); cluster.waitActive(); nameNodeRpc = cluster.getNameNodeRpc(); namesystem = cluster.getNamesystem(); perm = new PermissionStatus("TestBlockPlacementPolicyEC", null, FsPermission.getDefault()); }
/** * Verify balancer won't violate upgrade domain block placement policy. * @throws Exception */ @Test(timeout=100000) public void testUpgradeDomainPolicyAfterBalance() throws Exception { final Configuration conf = new HdfsConfiguration(); initConf(conf); conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyWithUpgradeDomain.class, BlockPlacementPolicy.class); long[] capacities = new long[] { CAPACITY, CAPACITY, CAPACITY }; String[] hosts = {"host0", "host1", "host2"}; String[] racks = { RACK0, RACK1, RACK1 }; String[] UDs = { "ud0", "ud1", "ud2" }; runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks, UDs, CAPACITY, "host3", RACK2, "ud2"); }
/** * Filesystem checker. * @param conf configuration (namenode config) * @param namenode namenode that this fsck is going to use * @param pmap key=value[] map passed to the http servlet as url parameters * @param out output stream to write the fsck output * @param totalDatanodes number of live datanodes * @param minReplication minimum replication * @param remoteAddress source address of the fsck request */ NamenodeFsck(Configuration conf, NameNode namenode, NetworkTopology networktopology, Map<String,String[]> pmap, PrintWriter out, int totalDatanodes, short minReplication, InetAddress remoteAddress) { this.conf = conf; this.namenode = namenode; this.networktopology = networktopology; this.out = out; this.totalDatanodes = totalDatanodes; this.minReplication = minReplication; this.remoteAddress = remoteAddress; this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null, networktopology, namenode.getNamesystem().getBlockManager().getDatanodeManager() .getHost2DatanodeMap()); for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) { String key = it.next(); if (key.equals("path")) { this.path = pmap.get("path")[0]; } else if (key.equals("move")) { this.doMove = true; } else if (key.equals("delete")) { this.doDelete = true; } else if (key.equals("files")) { this.showFiles = true; } else if (key.equals("blocks")) { this.showBlocks = true; } else if (key.equals("locations")) { this.showLocations = true; } else if (key.equals("racks")) { this.showRacks = true; } else if (key.equals("openforwrite")) {this.showOpenFiles = true; } else if (key.equals("listcorruptfileblocks")) { this.showCorruptFileBlocks = true; } else if (key.equals("startblockafter")) { this.currentCookie[0] = pmap.get("startblockafter")[0]; } else if (key.equals("includeSnapshots")) { this.snapshottableDirs = new ArrayList<String>(); } } }
private static void checkReplicationPolicyCompatibility(Configuration conf ) throws UnsupportedActionException { if (!(BlockPlacementPolicy.getInstance(conf, null, null) instanceof BlockPlacementPolicyDefault)) { throw new UnsupportedActionException( "Balancer without BlockPlacementPolicyDefault"); } }
@Before public void setupCluster() throws Exception { conf = new Configuration(); conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, SMALL_BLOCK); // Bump up replication interval so that we only run replication // checks explicitly. conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 600); // Increase max streams so that we re-replicate quickly. conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000); // See RandomDeleterPolicy javadoc. conf.setClass("dfs.block.replicator.classname", RandomDeleterPolicy.class, BlockPlacementPolicy.class); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(3) .build(); nn1 = cluster.getNameNode(0); nn2 = cluster.getNameNode(1); cluster.waitActive(); cluster.transitionToActive(0); // Trigger block reports so that the first NN trusts all // of the DNs, and will issue deletions cluster.triggerBlockReports(); fs = HATestUtil.configureFailoverFs(cluster, conf); }
private static void checkReplicationPolicyCompatibility(Configuration conf) throws UnsupportedActionException { if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != BlockPlacementPolicyDefault.class) { throw new UnsupportedActionException( "Balancer without BlockPlacementPolicyDefault"); } }
/** * Filesystem checker. * @param conf configuration (namenode config) * @param namenode namenode that this fsck is going to use * @param pmap key=value[] map passed to the http servlet as url parameters * @param out output stream to write the fsck output * @param totalDatanodes number of live datanodes * @param minReplication minimum replication * @param remoteAddress source address of the fsck request * @throws IOException */ NamenodeFsck(Configuration conf, NameNode namenode, NetworkTopology networktopology, Map<String,String[]> pmap, PrintWriter out, int totalDatanodes, short minReplication, InetAddress remoteAddress) { this.conf = conf; this.namenode = namenode; this.networktopology = networktopology; this.out = out; this.totalDatanodes = totalDatanodes; this.minReplication = minReplication; this.remoteAddress = remoteAddress; this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null, networktopology); for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) { String key = it.next(); if (key.equals("path")) { this.path = pmap.get("path")[0]; } else if (key.equals("move")) { this.doMove = true; } else if (key.equals("delete")) { this.doDelete = true; } else if (key.equals("files")) { this.showFiles = true; } else if (key.equals("blocks")) { this.showBlocks = true; } else if (key.equals("locations")) { this.showLocations = true; } else if (key.equals("racks")) { this.showRacks = true; } else if (key.equals("openforwrite")) {this.showOpenFiles = true; } else if (key.equals("listcorruptfileblocks")) { this.showCorruptFileBlocks = true; } else if (key.equals("startblockafter")) { this.currentCookie[0] = pmap.get("startblockafter")[0]; } else if (key.equals("includeSnapshots")) { this.snapshottableDirs = new ArrayList<String>(); } } }
private void runBalancerAndVerifyBlockPlacmentPolicy(Configuration conf, long[] capacities, String[] hosts, String[] racks, String[] UDs, long newCapacity, String newHost, String newRack, String newUD) throws Exception { int numOfDatanodes = capacities.length; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) .hosts(hosts).racks(racks).simulatedCapacities(capacities).build(); DatanodeManager dm = cluster.getNamesystem().getBlockManager(). getDatanodeManager(); if (UDs != null) { for(int i = 0; i < UDs.length; i++) { DatanodeID datanodeId = cluster.getDataNodes().get(i).getDatanodeId(); dm.getDatanode(datanodeId).setUpgradeDomain(UDs[i]); } } try { cluster.waitActive(); client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); // fill up the cluster to be 80% full long totalCapacity = sum(capacities); long totalUsedSpace = totalCapacity * 8 / 10; final long fileSize = totalUsedSpace / numOfDatanodes; DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024, fileSize, DEFAULT_BLOCK_SIZE, (short) numOfDatanodes, 0, false); // start up an empty node with the same capacity on the same rack as the // pinned host. cluster.startDataNodes(conf, 1, true, null, new String[] { newRack }, new String[] { newHost }, new long[] { newCapacity }); if (newUD != null) { DatanodeID newId = cluster.getDataNodes().get( numOfDatanodes).getDatanodeId(); dm.getDatanode(newId).setUpgradeDomain(newUD); } totalCapacity += newCapacity; // run balancer and validate results waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); // start rebalancing Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); BlockPlacementPolicy placementPolicy = cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy(); List<LocatedBlock> locatedBlocks = client. getBlockLocations(fileName, 0, fileSize).getLocatedBlocks(); for (LocatedBlock locatedBlock : locatedBlocks) { BlockPlacementStatus status = placementPolicy.verifyBlockPlacement( locatedBlock.getLocations(), numOfDatanodes); assertTrue(status.isPlacementPolicySatisfied()); } } finally { cluster.shutdown(); } }