Java 类org.apache.hadoop.hdfs.util.LightWeightLinkedSet 实例源码

项目:hadoop-EAR    文件:RaidCodec.java   
/**
 * Count the number of live replicas of each parity block in the raided file
 * If any stripe has not enough parity block replicas, add the stripe to 
 *  raidEncodingTasks to schedule encoding.
 * If forceAdd is true, we always add the stripe to raidEncodingTasks 
 * without checking
 * @param sourceINode
 * @param raidTasks
 * @param fs
 * @param forceAdd
 * @return true if all parity blocks of the file have enough replicas
 * @throws IOException
 */
public boolean checkRaidProgress(INodeFile sourceINode, 
    LightWeightLinkedSet<RaidBlockInfo> raidEncodingTasks, FSNamesystem fs,
    boolean forceAdd) throws IOException {
  boolean result = true;
  BlockInfo[] blocks = sourceINode.getBlocks();
  for (int i = 0; i < blocks.length;
      i += numStripeBlocks) {
    boolean hasParity = true;
    if (!forceAdd) {
      for (int j = 0; j < numParityBlocks; j++) {
        if (fs.countLiveNodes(blocks[i + j]) < this.parityReplication) {
          hasParity = false;
          break;
        }
      }
    }
    if (!hasParity || forceAdd) {
      raidEncodingTasks.add(new RaidBlockInfo(blocks[i], parityReplication, i));
      result = false; 
    }
  }
  return result;
}
项目:hadoop    文件:BlockManager.java   
/**
 * Return the number of nodes hosting a given block, grouped
 * by the state of those replicas.
 */
public NumberReplicas countNodes(Block b) {
  int decommissioned = 0;
  int live = 0;
  int corrupt = 0;
  int excess = 0;
  int stale = 0;
  Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
  for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
    final DatanodeDescriptor node = storage.getDatanodeDescriptor();
    if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
      corrupt++;
    } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
      decommissioned++;
    } else {
      LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
          .getDatanodeUuid());
      if (blocksExcess != null && blocksExcess.contains(b)) {
        excess++;
      } else {
        live++;
      }
    }
    if (storage.areBlockContentsStale()) {
      stale++;
    }
  }
  return new NumberReplicas(live, decommissioned, corrupt, excess, stale);
}
项目:hadoop    文件:UnderReplicatedBlocks.java   
/** Create an object. */
UnderReplicatedBlocks() {
  for (int i = 0; i < LEVEL; i++) {
    priorityQueues.add(new LightWeightLinkedSet<Block>());
    priorityToReplIdx.put(i, 0);
  }
}
项目:hadoop    文件:UnderReplicatedBlocks.java   
/** Check if a block is in the neededReplication queue */
synchronized boolean contains(Block block) {
  for(LightWeightLinkedSet<Block> set : priorityQueues) {
    if (set.contains(block)) {
      return true;
    }
  }
  return false;
}
项目:aliyun-oss-hadoop-fs    文件:UnderReplicatedBlocks.java   
/** Check if a block is in the neededReplication queue */
synchronized boolean contains(BlockInfo block) {
  for(LightWeightLinkedSet<BlockInfo> set : priorityQueues) {
    if (set.contains(block)) {
      return true;
    }
  }
  return false;
}
项目:aliyun-oss-hadoop-fs    文件:UnderReplicatedBlocks.java   
/**
 * Get a list of block lists to be replicated. The index of block lists
 * represents its replication priority. Iterates each block list in priority
 * order beginning with the highest priority list. Iterators use a bookmark to
 * resume where the previous iteration stopped. Returns when the block count
 * is met or iteration reaches the end of the lowest priority list, in which
 * case bookmarks for each block list are reset to the heads of their
 * respective lists.
 *
 * @param blocksToProcess - number of blocks to fetch from underReplicated
 *          blocks.
 * @return Return a list of block lists to be replicated. The block list index
 *         represents its replication priority.
 */
synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks(
    int blocksToProcess) {
  final List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL);

  int count = 0;
  int priority = 0;
  for (; count < blocksToProcess && priority < LEVEL; priority++) {
    if (priority == QUEUE_WITH_CORRUPT_BLOCKS) {
      // do not choose corrupted blocks.
      continue;
    }

    // Go through all blocks that need replications with current priority.
    // Set the iterator to the first unprocessed block at this priority level.
    final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
    final List<BlockInfo> blocks = new LinkedList<>();
    blocksToReplicate.add(blocks);
    // Loop through all remaining blocks in the list.
    for(; count < blocksToProcess && i.hasNext(); count++) {
      blocks.add(i.next());
    }
  }

  if (priority == LEVEL) {
    // Reset all bookmarks because there were no recently added blocks.
    for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) {
      q.resetBookmark();
    }
  }

  return blocksToReplicate;
}
项目:aliyun-oss-hadoop-fs    文件:UnderReplicatedBlocks.java   
/** return an iterator of all the under replication blocks */
@Override
public synchronized Iterator<BlockInfo> iterator() {
  final Iterator<LightWeightLinkedSet<BlockInfo>> q = priorityQueues.iterator();
  return new Iterator<BlockInfo>() {
    private Iterator<BlockInfo> b = q.next().iterator();

    @Override
    public BlockInfo next() {
      hasNext();
      return b.next();
    }

    @Override
    public boolean hasNext() {
      for(; !b.hasNext() && q.hasNext(); ) {
        b = q.next().iterator();
      }
      return b.hasNext();
    }

    @Override
    public void remove() {
      throw new UnsupportedOperationException();
    }
  };
}
项目:big-c    文件:BlockManager.java   
/**
 * Return the number of nodes hosting a given block, grouped
 * by the state of those replicas.
 */
public NumberReplicas countNodes(Block b) {
  int decommissioned = 0;
  int live = 0;
  int corrupt = 0;
  int excess = 0;
  int stale = 0;
  Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
  for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
    final DatanodeDescriptor node = storage.getDatanodeDescriptor();
    if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
      corrupt++;
    } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
      decommissioned++;
    } else {
      LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
          .getDatanodeUuid());
      if (blocksExcess != null && blocksExcess.contains(b)) {
        excess++;
      } else {
        live++;
      }
    }
    if (storage.areBlockContentsStale()) {
      stale++;
    }
  }
  return new NumberReplicas(live, decommissioned, corrupt, excess, stale);
}
项目:big-c    文件:UnderReplicatedBlocks.java   
/** Create an object. */
UnderReplicatedBlocks() {
  for (int i = 0; i < LEVEL; i++) {
    priorityQueues.add(new LightWeightLinkedSet<Block>());
    priorityToReplIdx.put(i, 0);
  }
}
项目:big-c    文件:UnderReplicatedBlocks.java   
/** Check if a block is in the neededReplication queue */
synchronized boolean contains(Block block) {
  for(LightWeightLinkedSet<Block> set : priorityQueues) {
    if (set.contains(block)) {
      return true;
    }
  }
  return false;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:BlockManager.java   
/**
 * Return the number of nodes hosting a given block, grouped
 * by the state of those replicas.
 */
public NumberReplicas countNodes(Block b) {
  int decommissioned = 0;
  int live = 0;
  int corrupt = 0;
  int excess = 0;
  int stale = 0;
  Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
  for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
    final DatanodeDescriptor node = storage.getDatanodeDescriptor();
    if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
      corrupt++;
    } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
      decommissioned++;
    } else {
      LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
          .getDatanodeUuid());
      if (blocksExcess != null && blocksExcess.contains(b)) {
        excess++;
      } else {
        live++;
      }
    }
    if (storage.areBlockContentsStale()) {
      stale++;
    }
  }
  return new NumberReplicas(live, decommissioned, corrupt, excess, stale);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:UnderReplicatedBlocks.java   
/** Create an object. */
UnderReplicatedBlocks() {
  for (int i = 0; i < LEVEL; i++) {
    priorityQueues.add(new LightWeightLinkedSet<Block>());
    priorityToReplIdx.put(i, 0);
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:UnderReplicatedBlocks.java   
/** Check if a block is in the neededReplication queue */
synchronized boolean contains(Block block) {
  for(LightWeightLinkedSet<Block> set : priorityQueues) {
    if (set.contains(block)) {
      return true;
    }
  }
  return false;
}
项目:hadoop-EAR    文件:TestBlockReplicationQueue.java   
public void testLightWeightLinkedSetBenchmark() {
  LOG.info("Test LIGHTWEIGHT_LINKED_SET");
  queueL = new LightWeightLinkedSet<Block>();
  insertBlocks(true);
  containsBlocks(true);
  removeBlocks(true);
}
项目:hadoop-EAR    文件:TestLinkedHashSet.java   
protected void setUp() {
  float maxF = LightWeightLinkedSet.rMaxLoadFactor;
  float minF = LightWeightLinkedSet.rMinLoadFactor;
  int initCapacity = LightWeightLinkedSet.MINIMUM_CAPACITY;
  rand = new Random(System.currentTimeMillis());
  list.clear();
  for (int i = 0; i < NUM; i++) {
    list.add(rand.nextInt());
  }
  set = new LightWeightLinkedSet<Integer>(initCapacity, maxF, minF);
}
项目:hadoop-plus    文件:BlockManager.java   
/**
 * Return the number of nodes hosting a given block, grouped
 * by the state of those replicas.
 */
public NumberReplicas countNodes(Block b) {
  int decommissioned = 0;
  int live = 0;
  int corrupt = 0;
  int excess = 0;
  int stale = 0;
  Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
  Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
  while (nodeIter.hasNext()) {
    DatanodeDescriptor node = nodeIter.next();
    if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
      corrupt++;
    } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
      decommissioned++;
    } else {
      LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
          .getStorageID());
      if (blocksExcess != null && blocksExcess.contains(b)) {
        excess++;
      } else {
        live++;
      }
    }
    if (node.areBlockContentsStale()) {
      stale++;
    }
  }
  return new NumberReplicas(live, decommissioned, corrupt, excess, stale);
}
项目:hadoop-plus    文件:UnderReplicatedBlocks.java   
/** Create an object. */
UnderReplicatedBlocks() {
  for (int i = 0; i < LEVEL; i++) {
    priorityQueues.add(new LightWeightLinkedSet<Block>());
    priorityToReplIdx.put(i, 0);
  }
}
项目:hadoop-plus    文件:UnderReplicatedBlocks.java   
/** Check if a block is in the neededReplication queue */
synchronized boolean contains(Block block) {
  for(LightWeightLinkedSet<Block> set : priorityQueues) {
    if (set.contains(block)) {
      return true;
    }
  }
  return false;
}
项目:FlexMap    文件:BlockManager.java   
/**
 * Return the number of nodes hosting a given block, grouped
 * by the state of those replicas.
 */
public NumberReplicas countNodes(Block b) {
  int decommissioned = 0;
  int live = 0;
  int corrupt = 0;
  int excess = 0;
  int stale = 0;
  Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
  for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
    final DatanodeDescriptor node = storage.getDatanodeDescriptor();
    if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
      corrupt++;
    } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
      decommissioned++;
    } else {
      LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
          .getDatanodeUuid());
      if (blocksExcess != null && blocksExcess.contains(b)) {
        excess++;
      } else {
        live++;
      }
    }
    if (storage.areBlockContentsStale()) {
      stale++;
    }
  }
  return new NumberReplicas(live, decommissioned, corrupt, excess, stale);
}
项目:FlexMap    文件:UnderReplicatedBlocks.java   
/** Create an object. */
UnderReplicatedBlocks() {
  for (int i = 0; i < LEVEL; i++) {
    priorityQueues.add(new LightWeightLinkedSet<Block>());
    priorityToReplIdx.put(i, 0);
  }
}
项目:FlexMap    文件:UnderReplicatedBlocks.java   
/** Check if a block is in the neededReplication queue */
synchronized boolean contains(Block block) {
  for(LightWeightLinkedSet<Block> set : priorityQueues) {
    if (set.contains(block)) {
      return true;
    }
  }
  return false;
}
项目:hops    文件:ExcessReplicasMap.java   
public LightWeightLinkedSet<Block> get(String dn) throws IOException {
  Collection<ExcessReplica> excessReplicas =
      getExcessReplicas(datanodeManager.getDatanode(dn).getSId());
  if (excessReplicas == null) {
    return null;
  }
  LightWeightLinkedSet<Block> excessBlocks =
      new LightWeightLinkedSet<>();
  for (ExcessReplica er : excessReplicas) {
    //FIXME: [M] might need to get the blockinfo from the db, but for now we don't need it
    excessBlocks.add(new Block(er.getBlockId()));
  }
  return excessBlocks;
}
项目:hadoop-TCP    文件:BlockManager.java   
/**
 * Return the number of nodes hosting a given block, grouped
 * by the state of those replicas.
 */
public NumberReplicas countNodes(Block b) {
  int decommissioned = 0;
  int live = 0;
  int corrupt = 0;
  int excess = 0;
  int stale = 0;
  Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
  Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
  while (nodeIter.hasNext()) {
    DatanodeDescriptor node = nodeIter.next();
    if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
      corrupt++;
    } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
      decommissioned++;
    } else {
      LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
          .getStorageID());
      if (blocksExcess != null && blocksExcess.contains(b)) {
        excess++;
      } else {
        live++;
      }
    }
    if (node.areBlockContentsStale()) {
      stale++;
    }
  }
  return new NumberReplicas(live, decommissioned, corrupt, excess, stale);
}
项目:hadoop-TCP    文件:UnderReplicatedBlocks.java   
/** Create an object. */
UnderReplicatedBlocks() {
  for (int i = 0; i < LEVEL; i++) {
    priorityQueues.add(new LightWeightLinkedSet<Block>());
    priorityToReplIdx.put(i, 0);
  }
}
项目:hadoop-TCP    文件:UnderReplicatedBlocks.java   
/** Check if a block is in the neededReplication queue */
synchronized boolean contains(Block block) {
  for(LightWeightLinkedSet<Block> set : priorityQueues) {
    if (set.contains(block)) {
      return true;
    }
  }
  return false;
}
项目:hardfs    文件:BlockManager.java   
/**
 * Return the number of nodes hosting a given block, grouped
 * by the state of those replicas.
 */
public NumberReplicas countNodes(Block b) {
  int decommissioned = 0;
  int live = 0;
  int corrupt = 0;
  int excess = 0;
  int stale = 0;
  Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
  Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
  while (nodeIter.hasNext()) {
    DatanodeDescriptor node = nodeIter.next();
    if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
      corrupt++;
    } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
      decommissioned++;
    } else {
      LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
          .getStorageID());
      if (blocksExcess != null && blocksExcess.contains(b)) {
        excess++;
      } else {
        live++;
      }
    }
    if (node.areBlockContentsStale()) {
      stale++;
    }
  }
  return new NumberReplicas(live, decommissioned, corrupt, excess, stale);
}
项目:hardfs    文件:UnderReplicatedBlocks.java   
/** Create an object. */
UnderReplicatedBlocks() {
  for (int i = 0; i < LEVEL; i++) {
    priorityQueues.add(new LightWeightLinkedSet<Block>());
    priorityToReplIdx.put(i, 0);
  }
}
项目:hardfs    文件:UnderReplicatedBlocks.java   
/** Check if a block is in the neededReplication queue */
synchronized boolean contains(Block block) {
  for(LightWeightLinkedSet<Block> set : priorityQueues) {
    if (set.contains(block)) {
      return true;
    }
  }
  return false;
}
项目:hadoop-on-lustre2    文件:BlockManager.java   
/**
 * Return the number of nodes hosting a given block, grouped
 * by the state of those replicas.
 */
public NumberReplicas countNodes(Block b) {
  int decommissioned = 0;
  int live = 0;
  int corrupt = 0;
  int excess = 0;
  int stale = 0;
  Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
  for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
    final DatanodeDescriptor node = storage.getDatanodeDescriptor();
    if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
      corrupt++;
    } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
      decommissioned++;
    } else {
      LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
          .getDatanodeUuid());
      if (blocksExcess != null && blocksExcess.contains(b)) {
        excess++;
      } else {
        live++;
      }
    }
    if (storage.areBlockContentsStale()) {
      stale++;
    }
  }
  return new NumberReplicas(live, decommissioned, corrupt, excess, stale);
}
项目:hadoop-on-lustre2    文件:UnderReplicatedBlocks.java   
/** Create an object. */
UnderReplicatedBlocks() {
  for (int i = 0; i < LEVEL; i++) {
    priorityQueues.add(new LightWeightLinkedSet<Block>());
    priorityToReplIdx.put(i, 0);
  }
}
项目:hadoop-on-lustre2    文件:UnderReplicatedBlocks.java   
/** Check if a block is in the neededReplication queue */
synchronized boolean contains(Block block) {
  for(LightWeightLinkedSet<Block> set : priorityQueues) {
    if (set.contains(block)) {
      return true;
    }
  }
  return false;
}
项目:RDFS    文件:TestBlockReplicationQueue.java   
public void testLightWeightLinkedSetBenchmark() {
  LOG.info("Test LIGHTWEIGHT_LINKED_SET");
  queueL = new LightWeightLinkedSet<Block>();
  insertBlocks(true);
  containsBlocks(true);
  removeBlocks(true);
}
项目:RDFS    文件:TestLinkedHashSet.java   
protected void setUp() {
  float maxF = LightWeightLinkedSet.rMaxLoadFactor;
  float minF = LightWeightLinkedSet.rMinLoadFactor;
  int initCapacity = LightWeightLinkedSet.MINIMUM_CAPACITY;
  rand = new Random(System.currentTimeMillis());
  list.clear();
  for (int i = 0; i < NUM; i++) {
    list.add(rand.nextInt());
  }
  set = new LightWeightLinkedSet<Integer>(initCapacity, maxF, minF);
}
项目:RDFS    文件:FSNamesystem.java   
/**
 * Serializes leases.
 */
void saveFilesUnderConstruction(SaveNamespaceContext ctx, DataOutputStream out) throws IOException {
  synchronized (leaseManager) {
    out.writeInt(leaseManager.countPath()); // write the size

    LightWeightLinkedSet<Lease> sortedLeases = leaseManager.getSortedLeases();
    Iterator<Lease> itr = sortedLeases.iterator();
    while (itr.hasNext()) {
      ctx.checkCancelled();
      Lease lease = itr.next();
      for (String path : lease.getPaths()) {
        // verify that path exists in namespace
        INode node = dir.getFileINode(path);
        if (node == null) {
          throw new IOException("saveLeases found path " + path +
            " but no matching entry in namespace.");
        }
        if (!node.isUnderConstruction()) {
          throw new IOException("saveLeases found path " + path +
            " but is not under construction.");
        }
        INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
        FSImageSerialization.writeINodeUnderConstruction(out, cons, path);
      }
    }
  }
}
项目:hadoop    文件:BlockManager.java   
/**
 * Modify (block-->datanode) map. Possibly generate replication tasks, if the
 * removed block is still valid.
 */
public void removeStoredBlock(Block block, DatanodeDescriptor node) {
  blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node);
  assert (namesystem.hasWriteLock());
  {
    if (!blocksMap.removeNode(block, node)) {
      blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
          " removed from node {}", block, node);
      return;
    }

    //
    // It's possible that the block was removed because of a datanode
    // failure. If the block is still valid, check if replication is
    // necessary. In that case, put block on a possibly-will-
    // be-replicated list.
    //
    BlockCollection bc = blocksMap.getBlockCollection(block);
    if (bc != null) {
      namesystem.decrementSafeBlockCount(block);
      updateNeededReplications(block, -1, 0);
    }

    //
    // We've removed a block from a node, so it's definitely no longer
    // in "excess" there.
    //
    LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
        .getDatanodeUuid());
    if (excessBlocks != null) {
      if (excessBlocks.remove(block)) {
        excessBlocksCount.decrementAndGet();
        blockLog.debug("BLOCK* removeStoredBlock: {} is removed from " +
            "excessBlocks", block);
        if (excessBlocks.size() == 0) {
          excessReplicateMap.remove(node.getDatanodeUuid());
        }
      }
    }

    // Remove the replica from corruptReplicas
    corruptReplicas.removeFromCorruptReplicasMap(block, node);
  }
}
项目:aliyun-oss-hadoop-fs    文件:UnderReplicatedBlocks.java   
/** Create an object. */
UnderReplicatedBlocks() {
  for (int i = 0; i < LEVEL; i++) {
    priorityQueues.add(new LightWeightLinkedSet<BlockInfo>());
  }
}
项目:big-c    文件:BlockManager.java   
/**
 * Modify (block-->datanode) map. Possibly generate replication tasks, if the
 * removed block is still valid.
 */
public void removeStoredBlock(Block block, DatanodeDescriptor node) {
  blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node);
  assert (namesystem.hasWriteLock());
  {
    if (!blocksMap.removeNode(block, node)) {
      blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
          " removed from node {}", block, node);
      return;
    }

    //
    // It's possible that the block was removed because of a datanode
    // failure. If the block is still valid, check if replication is
    // necessary. In that case, put block on a possibly-will-
    // be-replicated list.
    //
    BlockCollection bc = blocksMap.getBlockCollection(block);
    if (bc != null) {
      namesystem.decrementSafeBlockCount(block);
      updateNeededReplications(block, -1, 0);
    }

    //
    // We've removed a block from a node, so it's definitely no longer
    // in "excess" there.
    //
    LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
        .getDatanodeUuid());
    if (excessBlocks != null) {
      if (excessBlocks.remove(block)) {
        excessBlocksCount.decrementAndGet();
        blockLog.debug("BLOCK* removeStoredBlock: {} is removed from " +
            "excessBlocks", block);
        if (excessBlocks.size() == 0) {
          excessReplicateMap.remove(node.getDatanodeUuid());
        }
      }
    }

    // Remove the replica from corruptReplicas
    corruptReplicas.removeFromCorruptReplicasMap(block, node);
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:BlockManager.java   
/**
 * Modify (block-->datanode) map. Possibly generate replication tasks, if the
 * removed block is still valid.
 */
public void removeStoredBlock(Block block, DatanodeDescriptor node) {
  blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node);
  assert (namesystem.hasWriteLock());
  {
    if (!blocksMap.removeNode(block, node)) {
      blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
          " removed from node {}", block, node);
      return;
    }

    //
    // It's possible that the block was removed because of a datanode
    // failure. If the block is still valid, check if replication is
    // necessary. In that case, put block on a possibly-will-
    // be-replicated list.
    //
    BlockCollection bc = blocksMap.getBlockCollection(block);
    if (bc != null) {
      namesystem.decrementSafeBlockCount(block);
      updateNeededReplications(block, -1, 0);
    }

    //
    // We've removed a block from a node, so it's definitely no longer
    // in "excess" there.
    //
    LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
        .getDatanodeUuid());
    if (excessBlocks != null) {
      if (excessBlocks.remove(block)) {
        excessBlocksCount.decrementAndGet();
        blockLog.debug("BLOCK* removeStoredBlock: {} is removed from " +
            "excessBlocks", block);
        if (excessBlocks.size() == 0) {
          excessReplicateMap.remove(node.getDatanodeUuid());
        }
      }
    }

    // Remove the replica from corruptReplicas
    corruptReplicas.removeFromCorruptReplicasMap(block, node);
  }
}
项目:hadoop-EAR    文件:SnapshotNode.java   
/**
 * Tries to get the most up to date lengths of files under construction.
 */
void updateLeasedFiles(SnapshotStorage ssStore) throws IOException {
  FSNamesystem fsNamesys = ssStore.getFSNamesystem();
  List<Block> blocksForNN = new ArrayList<Block>();

  leaseUpdateThreadPool = new ThreadPoolExecutor(1, maxLeaseUpdateThreads, 60, 
                                              TimeUnit.SECONDS,
                                              new LinkedBlockingQueue<Runnable>());
  ((ThreadPoolExecutor)leaseUpdateThreadPool).allowCoreThreadTimeOut(true);

  // Try to update lengths for leases from DN
  LightWeightLinkedSet<Lease> sortedLeases = fsNamesys.leaseManager.getSortedLeases();
  Iterator<Lease> itr = sortedLeases.iterator();
  while (itr.hasNext()) {
    Lease lease = itr.next();
    for (String path : lease.getPaths()) {
      // Update file lengths using worker threads to increase throughput
      leaseUpdateThreadPool.execute(
                 new LeaseUpdateWorker(conf, path, fsNamesys, blocksForNN));
    }
  }

  try {
    leaseUpdateThreadPool.shutdown();
    // Wait till update tasks finish successfully (max 20 mins?)
    if (!leaseUpdateThreadPool.awaitTermination(1200, TimeUnit.SECONDS)) {
      throw new IOException("Updating lease files failed");
    }
  } catch (InterruptedException e) {
      throw new IOException("Snapshot creation interrupted while updating leased files");
  }

  // Fetch block lengths for renamed/deleted leases from NN
  long[] blockIds = new long[blocksForNN.size()];

  for (int i = 0; i < blocksForNN.size(); ++i) {
    blockIds[i] = blocksForNN.get(i).getBlockId();
  }

  long[] lengths = namenode.getBlockLengths(blockIds);

  for (int i = 0; i < blocksForNN.size(); ++i) {
    if (lengths[i] == -1) {
      // Couldn't update block length, keep preferred length
      LOG.error("Couldn't update length for block " + blocksForNN.get(i));
    } else {
      blocksForNN.get(i).setNumBytes(lengths[i]);
    }
  }
}
项目:hadoop-EAR    文件:UnderReplicatedBlocks.java   
UnderReplicatedBlocks() {
  for(int i=0; i<LEVEL; i++) {
    priorityQueues.add(new LightWeightLinkedSet<BlockInfo>());
  }
  raidQueue =  new RaidMissingBlocks();
}