public RemoteIterator<CacheDirectiveEntry> listCacheDirectives( CacheDirectiveInfo filter) throws IOException { return new CacheDirectiveIterator(namenode, filter, traceSampler); }
@Test(timeout=120000) public void testWaitForCachedReplicas() throws Exception { FileSystemTestHelper helper = new FileSystemTestHelper(); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { return ((namenode.getNamesystem().getCacheCapacity() == (NUM_DATANODES * CACHE_CAPACITY)) && (namenode.getNamesystem().getCacheUsed() == 0)); } }, 500, 60000); // Send a cache report referring to a bogus block. It is important that // the NameNode be robust against this. NamenodeProtocols nnRpc = namenode.getRpcServer(); DataNode dn0 = cluster.getDataNodes().get(0); String bpid = cluster.getNamesystem().getBlockPoolId(); LinkedList<Long> bogusBlockIds = new LinkedList<Long> (); bogusBlockIds.add(999999L); nnRpc.cacheReport(dn0.getDNRegistrationForBP(bpid), bpid, bogusBlockIds); Path rootDir = helper.getDefaultWorkingDirectory(dfs); // Create the pool final String pool = "friendlyPool"; nnRpc.addCachePool(new CachePoolInfo("friendlyPool")); // Create some test files final int numFiles = 2; final int numBlocksPerFile = 2; final List<String> paths = new ArrayList<String>(numFiles); for (int i=0; i<numFiles; i++) { Path p = new Path(rootDir, "testCachePaths-" + i); FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile, (int)BLOCK_SIZE); paths.add(p.toUri().getPath()); } // Check the initial statistics at the namenode waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0"); // Cache and check each path in sequence int expected = 0; for (int i=0; i<numFiles; i++) { CacheDirectiveInfo directive = new CacheDirectiveInfo.Builder(). setPath(new Path(paths.get(i))). setPool(pool). build(); nnRpc.addCacheDirective(directive, EnumSet.noneOf(CacheFlag.class)); expected += numBlocksPerFile; waitForCachedBlocks(namenode, expected, expected, "testWaitForCachedReplicas:1"); } // Check that the datanodes have the right cache values DatanodeInfo[] live = dfs.getDataNodeStats(DatanodeReportType.LIVE); assertEquals("Unexpected number of live nodes", NUM_DATANODES, live.length); long totalUsed = 0; for (DatanodeInfo dn : live) { final long cacheCapacity = dn.getCacheCapacity(); final long cacheUsed = dn.getCacheUsed(); final long cacheRemaining = dn.getCacheRemaining(); assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity); assertEquals("Capacity not equal to used + remaining", cacheCapacity, cacheUsed + cacheRemaining); assertEquals("Remaining not equal to capacity - used", cacheCapacity - cacheUsed, cacheRemaining); totalUsed += cacheUsed; } assertEquals(expected*BLOCK_SIZE, totalUsed); // Uncache and check each path in sequence RemoteIterator<CacheDirectiveEntry> entries = new CacheDirectiveIterator(nnRpc, null, Sampler.NEVER); for (int i=0; i<numFiles; i++) { CacheDirectiveEntry entry = entries.next(); nnRpc.removeCacheDirective(entry.getInfo().getId()); expected -= numBlocksPerFile; waitForCachedBlocks(namenode, expected, expected, "testWaitForCachedReplicas:2"); } }
public RemoteIterator<CacheDirectiveEntry> listCacheDirectives( CacheDirectiveInfo filter) throws IOException { checkOpen(); return new CacheDirectiveIterator(namenode, filter, tracer); }
@Test(timeout=120000) public void testWaitForCachedReplicas() throws Exception { FileSystemTestHelper helper = new FileSystemTestHelper(); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { return ((namenode.getNamesystem().getCacheCapacity() == (NUM_DATANODES * CACHE_CAPACITY)) && (namenode.getNamesystem().getCacheUsed() == 0)); } }, 500, 60000); // Send a cache report referring to a bogus block. It is important that // the NameNode be robust against this. NamenodeProtocols nnRpc = namenode.getRpcServer(); DataNode dn0 = cluster.getDataNodes().get(0); String bpid = cluster.getNamesystem().getBlockPoolId(); LinkedList<Long> bogusBlockIds = new LinkedList<Long> (); bogusBlockIds.add(999999L); nnRpc.cacheReport(dn0.getDNRegistrationForBP(bpid), bpid, bogusBlockIds); Path rootDir = helper.getDefaultWorkingDirectory(dfs); // Create the pool final String pool = "friendlyPool"; nnRpc.addCachePool(new CachePoolInfo("friendlyPool")); // Create some test files final int numFiles = 2; final int numBlocksPerFile = 2; final List<String> paths = new ArrayList<String>(numFiles); for (int i=0; i<numFiles; i++) { Path p = new Path(rootDir, "testCachePaths-" + i); FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile, (int)BLOCK_SIZE); paths.add(p.toUri().getPath()); } // Check the initial statistics at the namenode waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0"); // Cache and check each path in sequence int expected = 0; for (int i=0; i<numFiles; i++) { CacheDirectiveInfo directive = new CacheDirectiveInfo.Builder(). setPath(new Path(paths.get(i))). setPool(pool). build(); nnRpc.addCacheDirective(directive, EnumSet.noneOf(CacheFlag.class)); expected += numBlocksPerFile; waitForCachedBlocks(namenode, expected, expected, "testWaitForCachedReplicas:1"); } // Check that the datanodes have the right cache values DatanodeInfo[] live = dfs.getDataNodeStats(DatanodeReportType.LIVE); assertEquals("Unexpected number of live nodes", NUM_DATANODES, live.length); long totalUsed = 0; for (DatanodeInfo dn : live) { final long cacheCapacity = dn.getCacheCapacity(); final long cacheUsed = dn.getCacheUsed(); final long cacheRemaining = dn.getCacheRemaining(); assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity); assertEquals("Capacity not equal to used + remaining", cacheCapacity, cacheUsed + cacheRemaining); assertEquals("Remaining not equal to capacity - used", cacheCapacity - cacheUsed, cacheRemaining); totalUsed += cacheUsed; } assertEquals(expected*BLOCK_SIZE, totalUsed); // Uncache and check each path in sequence RemoteIterator<CacheDirectiveEntry> entries = new CacheDirectiveIterator(nnRpc, null, FsTracer.get(conf)); for (int i=0; i<numFiles; i++) { CacheDirectiveEntry entry = entries.next(); nnRpc.removeCacheDirective(entry.getInfo().getId()); expected -= numBlocksPerFile; waitForCachedBlocks(namenode, expected, expected, "testWaitForCachedReplicas:2"); } }
public RemoteIterator<CacheDirectiveEntry> listCacheDirectives( CacheDirectiveInfo filter) throws IOException { return new CacheDirectiveIterator(namenode, filter); }
@Test(timeout=120000) public void testWaitForCachedReplicas() throws Exception { FileSystemTestHelper helper = new FileSystemTestHelper(); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { return ((namenode.getNamesystem().getCacheCapacity() == (NUM_DATANODES * CACHE_CAPACITY)) && (namenode.getNamesystem().getCacheUsed() == 0)); } }, 500, 60000); // Send a cache report referring to a bogus block. It is important that // the NameNode be robust against this. NamenodeProtocols nnRpc = namenode.getRpcServer(); DataNode dn0 = cluster.getDataNodes().get(0); String bpid = cluster.getNamesystem().getBlockPoolId(); LinkedList<Long> bogusBlockIds = new LinkedList<Long> (); bogusBlockIds.add(999999L); nnRpc.cacheReport(dn0.getDNRegistrationForBP(bpid), bpid, bogusBlockIds); Path rootDir = helper.getDefaultWorkingDirectory(dfs); // Create the pool final String pool = "friendlyPool"; nnRpc.addCachePool(new CachePoolInfo("friendlyPool")); // Create some test files final int numFiles = 2; final int numBlocksPerFile = 2; final List<String> paths = new ArrayList<String>(numFiles); for (int i=0; i<numFiles; i++) { Path p = new Path(rootDir, "testCachePaths-" + i); FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile, (int)BLOCK_SIZE); paths.add(p.toUri().getPath()); } // Check the initial statistics at the namenode waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0"); // Cache and check each path in sequence int expected = 0; for (int i=0; i<numFiles; i++) { CacheDirectiveInfo directive = new CacheDirectiveInfo.Builder(). setPath(new Path(paths.get(i))). setPool(pool). build(); nnRpc.addCacheDirective(directive, EnumSet.noneOf(CacheFlag.class)); expected += numBlocksPerFile; waitForCachedBlocks(namenode, expected, expected, "testWaitForCachedReplicas:1"); } // Check that the datanodes have the right cache values DatanodeInfo[] live = dfs.getDataNodeStats(DatanodeReportType.LIVE); assertEquals("Unexpected number of live nodes", NUM_DATANODES, live.length); long totalUsed = 0; for (DatanodeInfo dn : live) { final long cacheCapacity = dn.getCacheCapacity(); final long cacheUsed = dn.getCacheUsed(); final long cacheRemaining = dn.getCacheRemaining(); assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity); assertEquals("Capacity not equal to used + remaining", cacheCapacity, cacheUsed + cacheRemaining); assertEquals("Remaining not equal to capacity - used", cacheCapacity - cacheUsed, cacheRemaining); totalUsed += cacheUsed; } assertEquals(expected*BLOCK_SIZE, totalUsed); // Uncache and check each path in sequence RemoteIterator<CacheDirectiveEntry> entries = new CacheDirectiveIterator(nnRpc, null); for (int i=0; i<numFiles; i++) { CacheDirectiveEntry entry = entries.next(); nnRpc.removeCacheDirective(entry.getInfo().getId()); expected -= numBlocksPerFile; waitForCachedBlocks(namenode, expected, expected, "testWaitForCachedReplicas:2"); } }