/** * Convert a CachedBlockList into a DatanodeCommand with a list of blocks. * * @param list The {@link CachedBlocksList}. This function * clears the list. * @param datanode The datanode. * @param action The action to perform in the command. * @param poolId The block pool id. * @return A DatanodeCommand to be sent back to the DN, or null if * there is nothing to be done. */ private DatanodeCommand getCacheCommand(CachedBlocksList list, DatanodeDescriptor datanode, int action, String poolId) { int length = list.size(); if (length == 0) { return null; } // Read the existing cache commands. long[] blockIds = new long[length]; int i = 0; for (Iterator<CachedBlock> iter = list.iterator(); iter.hasNext(); ) { CachedBlock cachedBlock = iter.next(); blockIds[i++] = cachedBlock.getBlockId(); } return new BlockIdCommand(action, poolId, blockIds); }
private String findReasonForNotCaching(CachedBlock cblock, BlockInfoContiguous blockInfo) { if (blockInfo == null) { // Somehow, a cache report with the block arrived, but the block // reports from the DataNode haven't (yet?) described such a block. // Alternately, the NameNode might have invalidated the block, but the // DataNode hasn't caught up. In any case, we want to tell the DN // to uncache this. return "not tracked by the BlockManager"; } else if (!blockInfo.isComplete()) { // When a cached block changes state from complete to some other state // on the DataNode (perhaps because of append), it will begin the // uncaching process. However, the uncaching process is not // instantaneous, especially if clients have pinned the block. So // there may be a period of time when incomplete blocks remain cached // on the DataNodes. return "not complete"; } else if (cblock.getReplication() == 0) { // Since 0 is not a valid value for a cache directive's replication // field, seeing a replication of 0 on a CacheBlock means that it // has never been reached by any sweep. return "not needed by any directives"; } else if (cblock.getMark() != mark) { // Although the block was needed in the past, we didn't reach it during // the current sweep. Therefore, it doesn't need to be cached any more. // Need to set the replication to 0 so it doesn't flip back to cached // when the mark flips on the next scan cblock.setReplicationAndMark((short)0, mark); return "no longer needed by any directives"; } return null; }
private void testAddElementsToList(CachedBlocksList list, CachedBlock[] blocks) { Assert.assertTrue("expected list to start off empty.", !list.iterator().hasNext()); for (CachedBlock block : blocks) { Assert.assertTrue(list.add(block)); } }
private void testRemoveElementsFromList(Random r, CachedBlocksList list, CachedBlock[] blocks) { int i = 0; for (Iterator<CachedBlock> iter = list.iterator(); iter.hasNext(); ) { Assert.assertEquals(blocks[i], iter.next()); i++; } if (r.nextBoolean()) { LOG.info("Removing via iterator"); for (Iterator<CachedBlock> iter = list.iterator(); iter.hasNext() ;) { iter.next(); iter.remove(); } } else { LOG.info("Removing in pseudo-random order"); CachedBlock[] remainingBlocks = Arrays.copyOf(blocks, blocks.length); for (int removed = 0; removed < remainingBlocks.length; ) { int toRemove = r.nextInt(remainingBlocks.length); if (remainingBlocks[toRemove] != null) { Assert.assertTrue(list.remove(remainingBlocks[toRemove])); remainingBlocks[toRemove] = null; removed++; } } } Assert.assertTrue("expected list to be empty after everything " + "was removed.", !list.iterator().hasNext()); }
/** * Convert a CachedBlockList into a DatanodeCommand with a list of blocks. * * @param list The {@link CachedBlocksList}. This function * clears the list. * @param action The action to perform in the command. * @param poolId The block pool id. * @return A DatanodeCommand to be sent back to the DN, or null if * there is nothing to be done. */ private DatanodeCommand getCacheCommand(CachedBlocksList list, int action, String poolId) { int length = list.size(); if (length == 0) { return null; } // Read the existing cache commands. long[] blockIds = new long[length]; int i = 0; for (CachedBlock cachedBlock : list) { blockIds[i++] = cachedBlock.getBlockId(); } return new BlockIdCommand(action, poolId, blockIds); }
private String findReasonForNotCaching(CachedBlock cblock, BlockInfo blockInfo) { if (blockInfo == null) { // Somehow, a cache report with the block arrived, but the block // reports from the DataNode haven't (yet?) described such a block. // Alternately, the NameNode might have invalidated the block, but the // DataNode hasn't caught up. In any case, we want to tell the DN // to uncache this. return "not tracked by the BlockManager"; } else if (!blockInfo.isComplete()) { // When a cached block changes state from complete to some other state // on the DataNode (perhaps because of append), it will begin the // uncaching process. However, the uncaching process is not // instantaneous, especially if clients have pinned the block. So // there may be a period of time when incomplete blocks remain cached // on the DataNodes. return "not complete"; } else if (cblock.getReplication() == 0) { // Since 0 is not a valid value for a cache directive's replication // field, seeing a replication of 0 on a CacheBlock means that it // has never been reached by any sweep. return "not needed by any directives"; } else if (cblock.getMark() != mark) { // Although the block was needed in the past, we didn't reach it during // the current sweep. Therefore, it doesn't need to be cached any more. // Need to set the replication to 0 so it doesn't flip back to cached // when the mark flips on the next scan cblock.setReplicationAndMark((short)0, mark); return "no longer needed by any directives"; } return null; }
@Test(timeout=60000) public void testSingleList() { DatanodeDescriptor dn = new DatanodeDescriptor( new DatanodeID("127.0.0.1", "localhost", "abcd", 5000, 5001, 5002, 5003)); CachedBlock[] blocks = new CachedBlock[] { new CachedBlock(0L, (short)1, true), new CachedBlock(1L, (short)1, true), new CachedBlock(2L, (short)1, true), }; // check that lists are empty Assert.assertTrue("expected pending cached list to start off empty.", !dn.getPendingCached().iterator().hasNext()); Assert.assertTrue("expected cached list to start off empty.", !dn.getCached().iterator().hasNext()); Assert.assertTrue("expected pending uncached list to start off empty.", !dn.getPendingUncached().iterator().hasNext()); // add a block to the back Assert.assertTrue(dn.getCached().add(blocks[0])); Assert.assertTrue("expected pending cached list to still be empty.", !dn.getPendingCached().iterator().hasNext()); Assert.assertEquals("failed to insert blocks[0]", blocks[0], dn.getCached().iterator().next()); Assert.assertTrue("expected pending uncached list to still be empty.", !dn.getPendingUncached().iterator().hasNext()); // add another block to the back Assert.assertTrue(dn.getCached().add(blocks[1])); Iterator<CachedBlock> iter = dn.getCached().iterator(); Assert.assertEquals(blocks[0], iter.next()); Assert.assertEquals(blocks[1], iter.next()); Assert.assertTrue(!iter.hasNext()); // add a block to the front Assert.assertTrue(dn.getCached().addFirst(blocks[2])); iter = dn.getCached().iterator(); Assert.assertEquals(blocks[2], iter.next()); Assert.assertEquals(blocks[0], iter.next()); Assert.assertEquals(blocks[1], iter.next()); Assert.assertTrue(!iter.hasNext()); // remove a block from the middle Assert.assertTrue(dn.getCached().remove(blocks[0])); iter = dn.getCached().iterator(); Assert.assertEquals(blocks[2], iter.next()); Assert.assertEquals(blocks[1], iter.next()); Assert.assertTrue(!iter.hasNext()); // remove all blocks dn.getCached().clear(); Assert.assertTrue("expected cached list to be empty after clear.", !dn.getPendingCached().iterator().hasNext()); }
/** * Modify (block-->datanode) map. Possibly generate replication tasks, if the * removed block is still valid. */ public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) { blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node); assert (namesystem.hasWriteLock()); { if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) { blockLog.debug("BLOCK* removeStoredBlock: {} has already been" + " removed from node {}", storedBlock, node); return; } CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks() .get(new CachedBlock(storedBlock.getBlockId(), (short) 0, false)); if (cblock != null) { boolean removed = false; removed |= node.getPendingCached().remove(cblock); removed |= node.getCached().remove(cblock); removed |= node.getPendingUncached().remove(cblock); if (removed) { blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching " + "related lists on node {}", storedBlock, node); } } // // It's possible that the block was removed because of a datanode // failure. If the block is still valid, check if replication is // necessary. In that case, put block on a possibly-will- // be-replicated list. // BlockCollection bc = getBlockCollection(storedBlock); if (bc != null) { bmSafeMode.decrementSafeBlockCount(storedBlock); updateNeededReplications(storedBlock, -1, 0); } // // We've removed a block from a node, so it's definitely no longer // in "excess" there. // LightWeightHashSet<BlockInfo> excessBlocks = excessReplicateMap.get( node.getDatanodeUuid()); if (excessBlocks != null) { if (excessBlocks.remove(storedBlock)) { excessBlocksCount.decrementAndGet(); blockLog.debug("BLOCK* removeStoredBlock: {} is removed from " + "excessBlocks", storedBlock); if (excessBlocks.size() == 0) { excessReplicateMap.remove(node.getDatanodeUuid()); } } } // Remove the replica from corruptReplicas corruptReplicas.removeFromCorruptReplicasMap(storedBlock, node); } }