Java 类org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas 实例源码

项目:hadoop-EAR    文件:TestUnderReplicatedBlocks.java   
private void waitForExcessReplicasToChange(
  FSNamesystem namesystem,
  Block block,
  int newReplicas) throws Exception
{
  NumberReplicas num;
  long startChecking = System.currentTimeMillis();
  do {
    LOG.info("Waiting for a replica to become excess");
    namesystem.readLock();
    try {
      num = namesystem.countNodes(block);
    } finally {
      namesystem.readUnlock();
    }
    Thread.sleep(100);
    if (System.currentTimeMillis() - startChecking > 30000) {
      fail("Timed out waiting for excess replicas to change");
    }

  } while (num.excessReplicas() != newReplicas);
}
项目:hadoop-EAR    文件:TestUnderReplicatedBlocks.java   
private void waitForExcessReplicasToBeDeleted(FSNamesystem namesystem,
    Block block, DataNode dn) throws Exception {
  NumberReplicas num;
  long startChecking = System.currentTimeMillis();
  do {
    LOG.info("Waiting for the excess replica to be deleted");
    dn.scheduleNSBlockReceivedAndDeleted(0);
    namesystem.readLock();
    try {
      num = namesystem.countNodes(block);
    } finally {
      namesystem.readUnlock();
    }
    Thread.sleep(100);
    if (System.currentTimeMillis() - startChecking > 30000) {
      fail("Timed out waiting for excess replicas to be deleted");
    }

  } while (num.excessReplicas() != 0);
}
项目:hadoop-EAR    文件:TestNodeCount.java   
private void waitForExcessReplicasToChange(
  FSNamesystem namesystem,
  Block block,
  int newReplicas) throws Exception
{
  NumberReplicas num;
  long startChecking = System.currentTimeMillis();
  do {
    namesystem.readLock();
    try {
      num = namesystem.countNodes(block);
      LOG.info("We have " + num.excessReplicas() + " excess replica");
    } finally {
      namesystem.readUnlock();
    }
    Thread.sleep(100);
    if (System.currentTimeMillis() - startChecking > 30000) {
      namesystem.metaSave("TestNodeCount.meta");
      LOG.warn("Dumping meta into log directory");
      fail("Timed out waiting for excess replicas to change");
    }

  } while (num.excessReplicas() != newReplicas);
}
项目:cumulus    文件:BlockManager.java   
/**
 * If there were any replication requests that timed out, reap them
 * and put them back into the neededReplication queue
 */
void processPendingReplications() {
  Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
  if (timedOutItems != null) {
    namesystem.writeLock();
    try {
      for (int i = 0; i < timedOutItems.length; i++) {
        NumberReplicas num = countNodes(timedOutItems[i]);
        if (isNeededReplication(timedOutItems[i], getReplication(timedOutItems[i]),
                               num.liveReplicas())) {
          neededReplications.add(timedOutItems[i],
                                 num.liveReplicas(),
                                 num.decommissionedReplicas(),
                                 getReplication(timedOutItems[i]));
        }
      }
    } finally {
      namesystem.writeUnlock();
    }
    /* If we know the target datanodes where the replication timedout,
     * we could invoke decBlocksScheduled() on it. Its ok for now.
     */
  }
}
项目:cumulus    文件:BlockManager.java   
/**
 * Return the number of nodes that are live and decommissioned.
 */
NumberReplicas countNodes(Block b) {
  int count = 0;
  int live = 0;
  int corrupt = 0;
  int excess = 0;
  Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
  Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
  while (nodeIter.hasNext()) {
    DatanodeDescriptor node = nodeIter.next();
    if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
      corrupt++;
    } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
      count++;
    } else {
      Collection<Block> blocksExcess =
        excessReplicateMap.get(node.getStorageID());
      if (blocksExcess != null && blocksExcess.contains(b)) {
        excess++;
      } else {
        live++;
      }
    }
  }
  return new NumberReplicas(live, count, corrupt, excess);
}
项目:cumulus    文件:BlockManager.java   
private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
    NumberReplicas num) {
  int curReplicas = num.liveReplicas();
  int curExpectedReplicas = getReplication(block);
  INode fileINode = blocksMap.getINode(block);
  Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(block);
  StringBuilder nodeList = new StringBuilder();
  while (nodeIter.hasNext()) {
    DatanodeDescriptor node = nodeIter.next();
    nodeList.append(node.name);
    nodeList.append(" ");
  }
  FSNamesystem.LOG.info("Block: " + block + ", Expected Replicas: "
      + curExpectedReplicas + ", live replicas: " + curReplicas
      + ", corrupt replicas: " + num.corruptReplicas()
      + ", decommissioned replicas: " + num.decommissionedReplicas()
      + ", excess replicas: " + num.excessReplicas()
      + ", Is Open File: " + fileINode.isUnderConstruction()
      + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
      + srcNode.name + ", Is current datanode decommissioning: "
      + srcNode.isDecommissionInProgress());
}
项目:cumulus    文件:BlockManager.java   
void updateNeededReplications(Block block, int curReplicasDelta,
    int expectedReplicasDelta) {
  namesystem.writeLock();
  try {
    NumberReplicas repl = countNodes(block);
    int curExpectedReplicas = getReplication(block);
    if (isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) {
      neededReplications.update(block, repl.liveReplicas(), repl
          .decommissionedReplicas(), curExpectedReplicas, curReplicasDelta,
          expectedReplicasDelta);
    } else {
      int oldReplicas = repl.liveReplicas()-curReplicasDelta;
      int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
      neededReplications.remove(block, oldReplicas, repl.decommissionedReplicas(),
                                oldExpectedReplicas);
    }
  } finally {
    namesystem.writeUnlock();
  }
}
项目:RDFS    文件:TestUnderReplicatedBlocks.java   
private void waitForExcessReplicasToChange(
  FSNamesystem namesystem,
  Block block,
  int newReplicas) throws Exception
{
  NumberReplicas num;
  long startChecking = System.currentTimeMillis();
  do {
    namesystem.readLock();
    try {
      num = namesystem.countNodes(block);
    } finally {
      namesystem.readUnlock();
    }
    Thread.sleep(100);
    if (System.currentTimeMillis() - startChecking > 30000) {
      fail("Timed out waiting for excess replicas to change");
    }

  } while (num.excessReplicas() != newReplicas);
}
项目:RDFS    文件:TestNodeCount.java   
private void waitForExcessReplicasToChange(
  FSNamesystem namesystem,
  Block block,
  int newReplicas) throws Exception
{
  NumberReplicas num;
  long startChecking = System.currentTimeMillis();
  do {
    namesystem.readLock();
    try {
      num = namesystem.countNodes(block);
      LOG.info("We have " + num.excessReplicas() + " excess replica");
    } finally {
      namesystem.readUnlock();
    }
    Thread.sleep(100);
    if (System.currentTimeMillis() - startChecking > 30000) {
      namesystem.metaSave("TestNodeCount.meta");
      LOG.warn("Dumping meta into log directory");
      fail("Timed out waiting for excess replicas to change");
    }

  } while (num.excessReplicas() != newReplicas);
}
项目:cumulus    文件:BlockManager.java   
/**
 * For each block in the name-node verify whether it belongs to any file,
 * over or under replicated. Place it into the respective queue.
 */
void processMisReplicatedBlocks() {
  long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
  namesystem.writeLock();
  try {
    neededReplications.clear();
    for (BlockInfo block : blocksMap.getBlocks()) {
      INodeFile fileINode = block.getINode();
      if (fileINode == null) {
        // block does not belong to any file
        nrInvalid++;
        addToInvalidates(block);
        continue;
      }
      // calculate current replication
      short expectedReplication = fileINode.getReplication();
      NumberReplicas num = countNodes(block);
      int numCurrentReplica = num.liveReplicas();
      // add to under-replicated queue if need to be
      if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
        if (neededReplications.add(block, numCurrentReplica, num
            .decommissionedReplicas(), expectedReplication)) {
          nrUnderReplicated++;
        }
      }

      if (numCurrentReplica > expectedReplication) {
        // over-replicated block
        nrOverReplicated++;
        processOverReplicatedBlock(block, expectedReplication, null, null);
      }
    }
  } finally {
    namesystem.writeUnlock();
  }
  FSNamesystem.LOG.info("Total number of blocks = " + blocksMap.size());
  FSNamesystem.LOG.info("Number of invalid blocks = " + nrInvalid);
  FSNamesystem.LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
  FSNamesystem.LOG.info("Number of  over-replicated blocks = " + nrOverReplicated);
}
项目:cumulus    文件:BlockManager.java   
void checkReplication(Block block, int numExpectedReplicas) {
  // filter out containingNodes that are marked for decommission.
  NumberReplicas number = countNodes(block);
  if (isNeededReplication(block, numExpectedReplicas, number.liveReplicas())) { 
    neededReplications.add(block,
                           number.liveReplicas(),
                           number.decommissionedReplicas,
                           numExpectedReplicas);
  }
}
项目:hadoop-EAR    文件:TestUnderReplicatedBlocks.java   
public void testUnderReplicationWithDecommissionDataNode() throws Exception {
  final Configuration conf = new Configuration();
  final short REPLICATION_FACTOR = (short)1;
  File f = new File(HOST_FILE_PATH);
  if (f.exists()) {
    f.delete();
  }
  f.createNewFile();
  conf.set("dfs.hosts.exclude", HOST_FILE_PATH);
  LOG.info("Start the cluster");
  final MiniDFSCluster cluster = 
    new MiniDFSCluster(conf, REPLICATION_FACTOR, true, null);
  try {
    final FSNamesystem namesystem = cluster.getNameNode().namesystem;
    final FileSystem fs = cluster.getFileSystem();
    DatanodeDescriptor[] datanodes = (DatanodeDescriptor[])
          namesystem.heartbeats.toArray(
              new DatanodeDescriptor[REPLICATION_FACTOR]);
    assertEquals(1, datanodes.length);
    // populate the cluster with a one block file
    final Path FILE_PATH = new Path("/testfile2");
    DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
    DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
    Block block = DFSTestUtil.getFirstBlock(fs, FILE_PATH);

    // shutdown the datanode
    DataNodeProperties dnprop = shutdownDataNode(cluster, datanodes[0]);
    assertEquals(1, namesystem.getMissingBlocksCount()); // one missing block
    assertEquals(0, namesystem.getNonCorruptUnderReplicatedBlocks());

    // Make the only datanode to be decommissioned
    LOG.info("Decommission the datanode " + dnprop);
    addToExcludeFile(namesystem.getConf(), datanodes);
    namesystem.refreshNodes(namesystem.getConf());      

    // bring up the datanode
    cluster.restartDataNode(dnprop);

    // Wait for block report
    LOG.info("wait for its block report to come in");
    NumberReplicas num;
    long startTime = System.currentTimeMillis();
    do {
     namesystem.readLock();
     try {
       num = namesystem.countNodes(block);
     } finally {
       namesystem.readUnlock();
     }
     Thread.sleep(1000);
     LOG.info("live: " + num.liveReplicas() 
         + "Decom: " + num.decommissionedReplicas());
    } while (num.decommissionedReplicas() != 1 &&
        System.currentTimeMillis() - startTime < 30000);
    assertEquals("Decommissioning Replicas doesn't reach 1", 
        1, num.decommissionedReplicas());
    assertEquals(1, namesystem.getNonCorruptUnderReplicatedBlocks());
    assertEquals(0, namesystem.getMissingBlocksCount());
  } finally {
    cluster.shutdown();
  }
}
项目:hadoop-EAR    文件:TestRaidMissingBlocksQueue.java   
/**
 * Take down a datanode to generate raid missing blocks, and then bring it back
 * will restore the missing blocks.
 */
@Test
public void testRaidMissingBlocksByTakingDownDataNode() throws IOException, InterruptedException {
  MiniDFSCluster cluster = null;
  Configuration conf = new Configuration();
  try {
    cluster = new MiniDFSCluster(conf, 1, true, null);
    final FSNamesystem namesystem = cluster.getNameNode().namesystem;
    final DistributedFileSystem dfs = DFSUtil.convertToDFS(cluster.getFileSystem());
    String filePath = "/test/file1";
    RaidCodec rsCodec = RaidCodec.getCodec("rs");
    RaidDFSUtil.constructFakeRaidFile(dfs, filePath, rsCodec);

    DatanodeDescriptor[] datanodes = (DatanodeDescriptor[])
        namesystem.heartbeats.toArray(
            new DatanodeDescriptor[1]);
    assertEquals(1, datanodes.length);

    // shutdown the datanode
    DataNodeProperties dnprop = shutdownDataNode(cluster, datanodes[0]);
    assertEquals(rsCodec.numStripeBlocks, namesystem.getRaidMissingBlocksCount());
    assertEquals(0, namesystem.getMissingBlocksCount()); // zero non-raid missing block
    assertEquals(0, namesystem.getNonCorruptUnderReplicatedBlocks());

    // bring up the datanode
    cluster.restartDataNode(dnprop);

    // Wait for block report
    LOG.info("wait for its block report to come in");
    NumberReplicas num;
    FileStatus stat = dfs.getFileStatus(new Path(filePath));
    LocatedBlocks blocks = dfs.getClient().
        getLocatedBlocks(filePath, 0, stat.getLen());
    long startTime = System.currentTimeMillis();
    do {
      Thread.sleep(1000);
      int totalCount = 0;
      namesystem.readLock();
      try {
        for (LocatedBlock block : blocks.getLocatedBlocks()) {
          num = namesystem.countNodes(block.getBlock());
          totalCount += num.liveReplicas();
        }
        if (totalCount == rsCodec.numDataBlocks) {
          break;
        } else {
          LOG.info("wait for block report, received total replicas: " + totalCount);
        }
      } finally {
        namesystem.readUnlock();
      }
    } while (System.currentTimeMillis() - startTime < 30000);
    assertEquals(0, namesystem.getRaidMissingBlocksCount());
    assertEquals(0, namesystem.getMissingBlocksCount()); // zero non-raid missing block
    assertEquals(0, namesystem.getNonCorruptUnderReplicatedBlocks());

  } finally {
    if (cluster != null) {
      cluster.shutdown();
    }
  }   
}
项目:hadoop-EAR    文件:TestNodeCount.java   
public void testInvalidateMultipleReplicas() throws Exception {
  Configuration conf = new Configuration();
  MiniDFSCluster cluster =
    new MiniDFSCluster(conf, 5, true, null);
  final int FILE_LEN = 123;
  final String pathStr = "/testInvalidateMultipleReplicas";
  try {
    FileSystem fs = cluster.getFileSystem();
    Path path = new Path(pathStr);
    cluster.waitActive();
    // create a small file on 3 nodes
    DFSTestUtil.createFile(fs, path, 123, (short)3, 0);
    DFSTestUtil.waitReplication(fs, path, (short)3);
    NameNode nn = cluster.getNameNode();
    LocatedBlocks located = nn.getBlockLocations(pathStr, 0, FILE_LEN);

    // Get the original block locations
    List<LocatedBlock> blocks = located.getLocatedBlocks();
    LocatedBlock firstBlock = blocks.get(0);

    DatanodeInfo[] locations = firstBlock.getLocations();
    assertEquals("Should have 3 good blocks", 3, locations.length);
    nn.getNamesystem().stallReplicationWork();

    DatanodeInfo[] badLocations = new DatanodeInfo[2];
    badLocations[0] = locations[0];
    badLocations[1] = locations[1];

    // Report some blocks corrupt
    LocatedBlock badLBlock = new LocatedBlock(
        firstBlock.getBlock(), badLocations);

    nn.reportBadBlocks(new LocatedBlock[] {badLBlock});

    nn.getNamesystem().restartReplicationWork();

    DFSTestUtil.waitReplication(fs, path, (short)3);
    NumberReplicas num = nn.getNamesystem().countNodes(
        firstBlock.getBlock());
    assertEquals(0, num.corruptReplicas());

  } finally {
    cluster.shutdown();
  }
}
项目:cumulus    文件:BlockManager.java   
void metaSave(PrintWriter out) {
  //
  // Dump contents of neededReplication
  //
  synchronized (neededReplications) {
    out.println("Metasave: Blocks waiting for replication: " + 
                neededReplications.size());
    for (Block block : neededReplications) {
      List<DatanodeDescriptor> containingNodes =
                                        new ArrayList<DatanodeDescriptor>();
      NumberReplicas numReplicas = new NumberReplicas();
      // source node returned is not used
      chooseSourceDatanode(block, containingNodes, numReplicas);
      int usableReplicas = numReplicas.liveReplicas() +
                           numReplicas.decommissionedReplicas();

      if (block instanceof BlockInfo) {
        String fileName = ((BlockInfo)block).getINode().getFullPathName();
        out.print(fileName + ": ");
      }
      // l: == live:, d: == decommissioned c: == corrupt e: == excess
      out.print(block + ((usableReplicas > 0)? "" : " MISSING") + 
                " (replicas:" +
                " l: " + numReplicas.liveReplicas() +
                " d: " + numReplicas.decommissionedReplicas() +
                " c: " + numReplicas.corruptReplicas() +
                " e: " + numReplicas.excessReplicas() + ") "); 

      Collection<DatanodeDescriptor> corruptNodes = 
                                    corruptReplicas.getNodes(block);

      for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
           jt.hasNext();) {
        DatanodeDescriptor node = jt.next();
        String state = "";
        if (corruptNodes != null && corruptNodes.contains(node)) {
          state = "(corrupt)";
        } else if (node.isDecommissioned() || 
            node.isDecommissionInProgress()) {
          state = "(decommissioned)";
        }          
        out.print(" " + node + state + " : ");
      }
      out.println("");
    }
  }

  //
  // Dump blocks from pendingReplication
  //
  pendingReplications.metaSave(out);

  //
  // Dump blocks that are waiting to be deleted
  //
  dumpRecentInvalidateSets(out);
}
项目:cumulus    文件:BlockManager.java   
/**
 * Parse the data-nodes the block belongs to and choose one,
 * which will be the replication source.
 *
 * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
 * since the former do not have write traffic and hence are less busy.
 * We do not use already decommissioned nodes as a source.
 * Otherwise we choose a random node among those that did not reach their
 * replication limit.
 *
 * In addition form a list of all nodes containing the block
 * and calculate its replication numbers.
 */
private DatanodeDescriptor chooseSourceDatanode(
                                  Block block,
                                  List<DatanodeDescriptor> containingNodes,
                                  NumberReplicas numReplicas) {
  containingNodes.clear();
  DatanodeDescriptor srcNode = null;
  int live = 0;
  int decommissioned = 0;
  int corrupt = 0;
  int excess = 0;
  Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
  Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
  while(it.hasNext()) {
    DatanodeDescriptor node = it.next();
    Collection<Block> excessBlocks =
      excessReplicateMap.get(node.getStorageID());
    if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
      corrupt++;
    else if (node.isDecommissionInProgress() || node.isDecommissioned())
      decommissioned++;
    else if (excessBlocks != null && excessBlocks.contains(block)) {
      excess++;
    } else {
      live++;
    }
    containingNodes.add(node);
    // Check if this replica is corrupt
    // If so, do not select the node as src node
    if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
      continue;
    if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
      continue; // already reached replication limit
    // the block must not be scheduled for removal on srcNode
    if(excessBlocks != null && excessBlocks.contains(block))
      continue;
    // never use already decommissioned nodes
    if(node.isDecommissioned())
      continue;
    // we prefer nodes that are in DECOMMISSION_INPROGRESS state
    if(node.isDecommissionInProgress() || srcNode == null) {
      srcNode = node;
      continue;
    }
    if(srcNode.isDecommissionInProgress())
      continue;
    // switch to a different node randomly
    // this to prevent from deterministically selecting the same node even
    // if the node failed to replicate the block on previous iterations
    if(r.nextBoolean())
      srcNode = node;
  }
  if(numReplicas != null)
    numReplicas.initialize(live, decommissioned, corrupt, excess);
  return srcNode;
}
项目:cumulus    文件:BlockManager.java   
/**
 * Return true if there are any blocks on this node that have not
 * yet reached their replication factor. Otherwise returns false.
 */
boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
  boolean status = false;
  int underReplicatedBlocks = 0;
  int decommissionOnlyReplicas = 0;
  int underReplicatedInOpenFiles = 0;
  final Iterator<? extends Block> it = srcNode.getBlockIterator();
  while(it.hasNext()) {
    final Block block = it.next();
    INode fileINode = blocksMap.getINode(block);

    if (fileINode != null) {
      NumberReplicas num = countNodes(block);
      int curReplicas = num.liveReplicas();
      int curExpectedReplicas = getReplication(block);
      if (isNeededReplication(block, curExpectedReplicas, curReplicas)) {
        if (curExpectedReplicas > curReplicas) {
          //Log info about one block for this node which needs replication
          if (!status) {
            status = true;
            logBlockReplicationInfo(block, srcNode, num);
          }
          underReplicatedBlocks++;
          if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
            decommissionOnlyReplicas++;
          }
          if (fileINode.isUnderConstruction()) {
            underReplicatedInOpenFiles++;
          }
        }
        if (!neededReplications.contains(block) &&
          pendingReplications.getNumReplicas(block) == 0) {
          //
          // These blocks have been reported from the datanode
          // after the startDecommission method has been executed. These
          // blocks were in flight when the decommissioning was started.
          //
          neededReplications.add(block,
                                 curReplicas,
                                 num.decommissionedReplicas(),
                                 curExpectedReplicas);
        }
      }
    }
  }
  srcNode.decommissioningStatus.set(underReplicatedBlocks,
      decommissionOnlyReplicas, 
      underReplicatedInOpenFiles);
  return status;
}
项目:RDFS    文件:TestUnderReplicatedBlocks.java   
public void testUnderReplicationWithDecommissionDataNode() throws Exception {
  final Configuration conf = new Configuration();
  final short REPLICATION_FACTOR = (short)1;
  File f = new File(HOST_FILE_PATH);
  if (f.exists()) {
    f.delete();
  }
  conf.set("dfs.hosts.exclude", HOST_FILE_PATH);
  LOG.info("Start the cluster");
  final MiniDFSCluster cluster = 
    new MiniDFSCluster(conf, REPLICATION_FACTOR, true, null);
  try {
    final FSNamesystem namesystem = cluster.getNameNode().namesystem;
    final FileSystem fs = cluster.getFileSystem();
    DatanodeDescriptor[] datanodes = (DatanodeDescriptor[])
          namesystem.heartbeats.toArray(
              new DatanodeDescriptor[REPLICATION_FACTOR]);
    assertEquals(1, datanodes.length);
    // populate the cluster with a one block file
    final Path FILE_PATH = new Path("/testfile2");
    DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
    DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
    Block block = DFSTestUtil.getFirstBlock(fs, FILE_PATH);

    // shutdown the datanode
    DataNodeProperties dnprop = shutdownDataNode(cluster, datanodes[0]);
    assertEquals(1, namesystem.getMissingBlocksCount()); // one missing block
    assertEquals(0, namesystem.getNonCorruptUnderReplicatedBlocks());

    // Make the only datanode to be decommissioned
    LOG.info("Decommission the datanode " + dnprop);
    addToExcludeFile(namesystem.getConf(), datanodes);
    namesystem.refreshNodes(namesystem.getConf());      

    // bring up the datanode
    cluster.restartDataNode(dnprop);

    // Wait for block report
    LOG.info("wait for its block report to come in");
    NumberReplicas num;
    long startTime = System.currentTimeMillis();
    do {
     namesystem.readLock();
     try {
       num = namesystem.countNodes(block);
     } finally {
       namesystem.readUnlock();
     }
     Thread.sleep(1000);
     LOG.info("live: " + num.liveReplicas() 
         + "Decom: " + num.decommissionedReplicas());
    } while (num.decommissionedReplicas() != 1 &&
        System.currentTimeMillis() - startTime < 30000);
    assertEquals("Decommissioning Replicas doesn't reach 1", 
        1, num.decommissionedReplicas());
    assertEquals(1, namesystem.getNonCorruptUnderReplicatedBlocks());
    assertEquals(0, namesystem.getMissingBlocksCount());
  } finally {
    cluster.shutdown();
  }
}
项目:RDFS    文件:TestNodeCount.java   
public void testInvalidateMultipleReplicas() throws Exception {
  Configuration conf = new Configuration();
  MiniDFSCluster cluster =
    new MiniDFSCluster(conf, 5, true, null);
  final int FILE_LEN = 123;
  final String pathStr = "/testInvalidateMultipleReplicas";
  try {
    FileSystem fs = cluster.getFileSystem();
    Path path = new Path(pathStr);
    cluster.waitActive();
    // create a small file on 3 nodes
    DFSTestUtil.createFile(fs, path, 123, (short)3, 0);
    DFSTestUtil.waitReplication(fs, path, (short)3);
    NameNode nn = cluster.getNameNode();
    LocatedBlocks located = nn.getBlockLocations(pathStr, 0, FILE_LEN);

    // Get the original block locations
    List<LocatedBlock> blocks = located.getLocatedBlocks();
    LocatedBlock firstBlock = blocks.get(0);

    DatanodeInfo[] locations = firstBlock.getLocations();
    assertEquals("Should have 3 good blocks", 3, locations.length);
    nn.getNamesystem().stallReplicationWork();

    DatanodeInfo[] badLocations = new DatanodeInfo[2];
    badLocations[0] = locations[0];
    badLocations[1] = locations[1];

    // Report some blocks corrupt
    LocatedBlock badLBlock = new LocatedBlock(
        firstBlock.getBlock(), badLocations);

    nn.reportBadBlocks(new LocatedBlock[] {badLBlock});

    nn.getNamesystem().restartReplicationWork();

    DFSTestUtil.waitReplication(fs, path, (short)3);
    NumberReplicas num = nn.getNamesystem().countNodes(
        firstBlock.getBlock());
    assertEquals(0, num.corruptReplicas());

  } finally {
    cluster.shutdown();
  }
}