public void setCachedLocations(LocatedBlock block) { CachedBlock cachedBlock = new CachedBlock(block.getBlock().getBlockId(), (short)0, false); cachedBlock = cachedBlocks.get(cachedBlock); if (cachedBlock == null) { return; } List<DatanodeDescriptor> datanodes = cachedBlock.getDatanodes(Type.CACHED); for (DatanodeDescriptor datanode : datanodes) { block.addCachedLoc(datanode); } }
public void setCachedLocations(LocatedBlock block) { CachedBlock cachedBlock = new CachedBlock(block.getBlock().getBlockId(), (short)0, false); cachedBlock = cachedBlocks.get(cachedBlock); if (cachedBlock == null) { return; } List<DatanodeDescriptor> cachedDNs = cachedBlock.getDatanodes(Type.CACHED); for (DatanodeDescriptor datanode : cachedDNs) { // Filter out cached blocks that do not have a backing replica. // // This should not happen since it means the CacheManager thinks // something is cached that does not exist, but it's a safety // measure. boolean found = false; for (DatanodeInfo loc : block.getLocations()) { if (loc.equals(datanode)) { block.addCachedLoc(loc); found = true; break; } } if (!found) { LOG.warn("Datanode {} is not a valid cache location for block {} " + "because that node does not have a backing replica!", datanode, block.getBlock().getBlockName()); } } }
/** * Wait for the NameNode to have an expected number of cached blocks * and replicas. * @param nn NameNode * @param expectedCachedBlocks if -1, treat as wildcard * @param expectedCachedReplicas if -1, treat as wildcard * @throws Exception */ private static void waitForCachedBlocks(NameNode nn, final int expectedCachedBlocks, final int expectedCachedReplicas, final String logString) throws Exception { final FSNamesystem namesystem = nn.getNamesystem(); final CacheManager cacheManager = namesystem.getCacheManager(); LOG.info("Waiting for " + expectedCachedBlocks + " blocks with " + expectedCachedReplicas + " replicas."); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { int numCachedBlocks = 0, numCachedReplicas = 0; namesystem.readLock(); try { GSet<CachedBlock, CachedBlock> cachedBlocks = cacheManager.getCachedBlocks(); if (cachedBlocks != null) { for (Iterator<CachedBlock> iter = cachedBlocks.iterator(); iter.hasNext(); ) { CachedBlock cachedBlock = iter.next(); numCachedBlocks++; numCachedReplicas += cachedBlock.getDatanodes(Type.CACHED).size(); } } } finally { namesystem.readUnlock(); } LOG.info(logString + " cached blocks: have " + numCachedBlocks + " / " + expectedCachedBlocks + ". " + "cached replicas: have " + numCachedReplicas + " / " + expectedCachedReplicas); if (expectedCachedBlocks == -1 || numCachedBlocks == expectedCachedBlocks) { if (expectedCachedReplicas == -1 || numCachedReplicas == expectedCachedReplicas) { return true; } } return false; } }, 500, 60000); }
/** * Wait for the NameNode to have an expected number of cached blocks * and replicas. * @param nn NameNode * @param expectedCachedBlocks if -1, treat as wildcard * @param expectedCachedReplicas if -1, treat as wildcard * @throws Exception */ private static void waitForCachedBlocks(NameNode nn, final int expectedCachedBlocks, final int expectedCachedReplicas, final String logString) throws Exception { final FSNamesystem namesystem = nn.getNamesystem(); final CacheManager cacheManager = namesystem.getCacheManager(); LOG.info("Waiting for " + expectedCachedBlocks + " blocks with " + expectedCachedReplicas + " replicas."); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { int numCachedBlocks = 0, numCachedReplicas = 0; namesystem.readLock(); try { GSet<CachedBlock, CachedBlock> cachedBlocks = cacheManager.getCachedBlocks(); if (cachedBlocks != null) { for (Iterator<CachedBlock> iter = cachedBlocks.iterator(); iter.hasNext(); ) { CachedBlock cachedBlock = iter.next(); numCachedBlocks++; numCachedReplicas += cachedBlock.getDatanodes(Type.CACHED).size(); } } } finally { namesystem.readUnlock(); } if (expectedCachedBlocks == -1 || numCachedBlocks == expectedCachedBlocks) { if (expectedCachedReplicas == -1 || numCachedReplicas == expectedCachedReplicas) { return true; } } LOG.info(logString + " cached blocks: have " + numCachedBlocks + " / " + expectedCachedBlocks + ". " + "cached replicas: have " + numCachedReplicas + " / " + expectedCachedReplicas); return false; } }, 500, 60000); }
/** * Get a list of the datanodes which this block is cached, * planned to be cached, or planned to be uncached on. * * @param type If null, this parameter is ignored. * If it is non-null, we match only datanodes which * have it on this list. * See {@link DatanodeDescriptor.CachedBlocksList.Type} * for a description of all the lists. * * @return The list of datanodes. Modifying this list does not * alter the state of the CachedBlock. */ public List<DatanodeDescriptor> getDatanodes(Type type) { List<DatanodeDescriptor> nodes = new LinkedList<DatanodeDescriptor>(); for (int i = 0; i < triplets.length; i += 3) { CachedBlocksList list = (CachedBlocksList)triplets[i]; if ((type == null) || (list.getType() == type)) { nodes.add(list.getDatanode()); } } return nodes; }
/** * Get a list of the datanodes which this block is cached, * planned to be cached, or planned to be uncached on. * * @param type If null, this parameter is ignored. * If it is non-null, we match only datanodes which * have it on this list. * See {@link DatanodeDescriptor#CachedBlocksList#Type} * for a description of all the lists. * * @return The list of datanodes. Modifying this list does not * alter the state of the CachedBlock. */ public List<DatanodeDescriptor> getDatanodes(Type type) { List<DatanodeDescriptor> nodes = new LinkedList<DatanodeDescriptor>(); for (int i = 0; i < triplets.length; i += 3) { CachedBlocksList list = (CachedBlocksList)triplets[i]; if ((type == null) || (list.getType() == type)) { nodes.add(list.getDatanode()); } } return nodes; }