/** * Test directories with Hot, Warm and Cold polices. */ @Test public void testHotWarmColdDirs() throws Exception { LOG.info("testHotWarmColdDirs"); PathPolicyMap pathPolicyMap = new PathPolicyMap(3); NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); ClusterScheme clusterScheme = new ClusterScheme(); MigrationTest test = new MigrationTest(clusterScheme, nsScheme); try { test.runBasicTest(false); pathPolicyMap.moveAround(test.dfs); test.migrate(ExitStatus.SUCCESS); test.verify(true); } finally { test.shutdownCluster(); } }
@Test public void testMoverFailedRetry() throws Exception { // HDFS-8147 final Configuration conf = new HdfsConfiguration(); conf.set(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, "2"); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(3) .storageTypes( new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE}, {StorageType.DISK, StorageType.ARCHIVE}, {StorageType.DISK, StorageType.ARCHIVE}}).build(); try { cluster.waitActive(); final DistributedFileSystem dfs = cluster.getFileSystem(); final String file = "/testMoverFailedRetry"; // write to DISK final FSDataOutputStream out = dfs.create(new Path(file), (short) 2); out.writeChars("testMoverFailedRetry"); out.close(); // Delete block file so, block move will fail with FileNotFoundException LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); cluster.corruptBlockOnDataNodesByDeletingBlockFile(lb.getBlock()); // move to ARCHIVE dfs.setStoragePolicy(new Path(file), "COLD"); int rc = ToolRunner.run(conf, new Mover.Cli(), new String[] {"-p", file.toString()}); Assert.assertEquals("Movement should fail after some retry", ExitStatus.IO_EXCEPTION.getExitCode(), rc); } finally { cluster.shutdown(); } }
private void runMover() throws Exception { Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Map<URI, List<Path>> nnMap = Maps.newHashMap(); for (URI nn : namenodes) { nnMap.put(nn, null); } int result = Mover.run(nnMap, conf); Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), result); }
/** * Run Mover with arguments specifying files and directories */ @Test public void testMoveSpecificPaths() throws Exception { LOG.info("testMoveSpecificPaths"); final Path foo = new Path("/foo"); final Path barFile = new Path(foo, "bar"); final Path foo2 = new Path("/foo2"); final Path bar2File = new Path(foo2, "bar2"); Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap(); policyMap.put(foo, COLD); policyMap.put(foo2, WARM); NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(foo, foo2), Arrays.asList(barFile, bar2File), BLOCK_SIZE, null, policyMap); ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null); MigrationTest test = new MigrationTest(clusterScheme, nsScheme); test.setupCluster(); try { test.prepareNamespace(); test.setStoragePolicy(); Map<URI, List<Path>> map = Mover.Cli.getNameNodePathsToMove(test.conf, "-p", "/foo/bar", "/foo2"); int result = Mover.run(map, test.conf); Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), result); Thread.sleep(5000); test.verify(true); } finally { test.shutdownCluster(); } }
/** * @return NO_MOVE_PROGRESS if no progress in move after some retry. Return * SUCCESS if all moves are success and there is no remaining move. * Return NO_MOVE_BLOCK if there moves available but all the moves * cannot be scheduled. Otherwise, return IN_PROGRESS since there * must be some remaining moves. */ ExitStatus getExitStatus() { if (retryFailed) { return ExitStatus.NO_MOVE_PROGRESS; } else { return !isHasRemaining() ? ExitStatus.SUCCESS : isNoBlockMoved() ? ExitStatus.NO_MOVE_BLOCK : ExitStatus.IN_PROGRESS; } }
@Test(timeout = 300000) public void testMoveWhenStoragePolicyNotSatisfying() throws Exception { // HDFS-8147 final Configuration conf = new HdfsConfiguration(); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(3) .storageTypes( new StorageType[][] { { StorageType.DISK }, { StorageType.DISK }, { StorageType.DISK } }).build(); try { cluster.waitActive(); final DistributedFileSystem dfs = cluster.getFileSystem(); final String file = "/testMoveWhenStoragePolicyNotSatisfying"; // write to DISK final FSDataOutputStream out = dfs.create(new Path(file)); out.writeChars("testMoveWhenStoragePolicyNotSatisfying"); out.close(); // move to ARCHIVE dfs.setStoragePolicy(new Path(file), "COLD"); int rc = ToolRunner.run(conf, new Mover.Cli(), new String[] { "-p", file.toString() }); int exitcode = ExitStatus.NO_MOVE_BLOCK.getExitCode(); Assert.assertEquals("Exit code should be " + exitcode, exitcode, rc); } finally { cluster.shutdown(); } }
@Test public void testMoverFailedRetry() throws Exception { // HDFS-8147 final Configuration conf = new HdfsConfiguration(); initConf(conf); conf.set(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, "2"); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(3) .storageTypes( new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE}, {StorageType.DISK, StorageType.ARCHIVE}, {StorageType.DISK, StorageType.ARCHIVE}}).build(); try { cluster.waitActive(); final DistributedFileSystem dfs = cluster.getFileSystem(); final String file = "/testMoverFailedRetry"; // write to DISK final FSDataOutputStream out = dfs.create(new Path(file), (short) 2); out.writeChars("testMoverFailedRetry"); out.close(); // Delete block file so, block move will fail with FileNotFoundException LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); cluster.corruptBlockOnDataNodesByDeletingBlockFile(lb.getBlock()); // move to ARCHIVE dfs.setStoragePolicy(new Path(file), "COLD"); int rc = ToolRunner.run(conf, new Mover.Cli(), new String[] {"-p", file.toString()}); Assert.assertEquals("Movement should fail after some retry", ExitStatus.NO_MOVE_PROGRESS.getExitCode(), rc); } finally { cluster.shutdown(); } }
private void runBasicTest(boolean shutdown) throws Exception { setupCluster(); try { prepareNamespace(); verify(true); setStoragePolicy(); migrate(ExitStatus.SUCCESS); verify(true); } finally { if (shutdown) { shutdownCluster(); } } }
private void runMover(ExitStatus expectedExitCode) throws Exception { Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Map<URI, List<Path>> nnMap = Maps.newHashMap(); for (URI nn : namenodes) { nnMap.put(nn, null); } int result = Mover.run(nnMap, conf); Assert.assertEquals(expectedExitCode.getExitCode(), result); }
/** * Run the migration tool. */ void migrate(ExitStatus expectedExitCode) throws Exception { runMover(expectedExitCode); Thread.sleep(5000); // let the NN finish deletion }
/** * Move an open file into archival storage */ @Test public void testMigrateOpenFileToArchival() throws Exception { LOG.info("testMigrateOpenFileToArchival"); final Path fooDir = new Path("/foo"); Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap(); policyMap.put(fooDir, COLD); NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(fooDir), null, BLOCK_SIZE, null, policyMap); ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null); MigrationTest test = new MigrationTest(clusterScheme, nsScheme); test.setupCluster(); // create an open file banner("writing to file /foo/bar"); final Path barFile = new Path(fooDir, "bar"); DFSTestUtil.createFile(test.dfs, barFile, BLOCK_SIZE, (short) 1, 0L); FSDataOutputStream out = test.dfs.append(barFile); out.writeBytes("hello, "); ((DFSOutputStream) out.getWrappedStream()).hsync(); try { banner("start data migration"); test.setStoragePolicy(); // set /foo to COLD test.migrate(ExitStatus.SUCCESS); // make sure the under construction block has not been migrated LocatedBlocks lbs = test.dfs.getClient().getLocatedBlocks( barFile.toString(), BLOCK_SIZE); LOG.info("Locations: " + lbs); List<LocatedBlock> blks = lbs.getLocatedBlocks(); Assert.assertEquals(1, blks.size()); Assert.assertEquals(1, blks.get(0).getLocations().length); banner("finish the migration, continue writing"); // make sure the writing can continue out.writeBytes("world!"); ((DFSOutputStream) out.getWrappedStream()).hsync(); IOUtils.cleanup(LOG, out); lbs = test.dfs.getClient().getLocatedBlocks( barFile.toString(), BLOCK_SIZE); LOG.info("Locations: " + lbs); blks = lbs.getLocatedBlocks(); Assert.assertEquals(1, blks.size()); Assert.assertEquals(1, blks.get(0).getLocations().length); banner("finish writing, starting reading"); // check the content of /foo/bar FSDataInputStream in = test.dfs.open(barFile); byte[] buf = new byte[13]; // read from offset 1024 in.readFully(BLOCK_SIZE, buf, 0, buf.length); IOUtils.cleanup(LOG, in); Assert.assertEquals("hello, world!", new String(buf)); } finally { test.shutdownCluster(); } }