Java 类org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus 实例源码

项目:incubator-blur    文件:BlurBlockPlacementStatusDefault.java   
public BlurBlockPlacementStatusDefault(BlockPlacementStatus original, String shardServer) {
  _original = original;
  _shardServer = shardServer;
  if (_original != null) {
    _origPlacementPolicySatisfied = _original.isPlacementPolicySatisfied();
  } else {
    _origPlacementPolicySatisfied = true;
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestBalancer.java   
private void runBalancerAndVerifyBlockPlacmentPolicy(Configuration conf,
    long[] capacities, String[] hosts, String[] racks, String[] UDs,
    long newCapacity, String newHost, String newRack, String newUD)
        throws Exception {
  int numOfDatanodes = capacities.length;

  cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
      .hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
  DatanodeManager dm = cluster.getNamesystem().getBlockManager().
      getDatanodeManager();
  if (UDs != null) {
    for(int i = 0; i < UDs.length; i++) {
      DatanodeID datanodeId = cluster.getDataNodes().get(i).getDatanodeId();
      dm.getDatanode(datanodeId).setUpgradeDomain(UDs[i]);
    }
  }

  try {
    cluster.waitActive();
    client = NameNodeProxies.createProxy(conf,
        cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();

    // fill up the cluster to be 80% full
    long totalCapacity = sum(capacities);
    long totalUsedSpace = totalCapacity * 8 / 10;

    final long fileSize = totalUsedSpace / numOfDatanodes;
    DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
        fileSize, DEFAULT_BLOCK_SIZE, (short) numOfDatanodes, 0, false);

    // start up an empty node with the same capacity on the same rack as the
    // pinned host.
    cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
        new String[] { newHost }, new long[] { newCapacity });
    if (newUD != null) {
      DatanodeID newId = cluster.getDataNodes().get(
          numOfDatanodes).getDatanodeId();
      dm.getDatanode(newId).setUpgradeDomain(newUD);
    }
    totalCapacity += newCapacity;

    // run balancer and validate results
    waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);

    // start rebalancing
    Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
    Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
    BlockPlacementPolicy placementPolicy =
        cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy();
    List<LocatedBlock> locatedBlocks = client.
        getBlockLocations(fileName, 0, fileSize).getLocatedBlocks();
    for (LocatedBlock locatedBlock : locatedBlocks) {
      BlockPlacementStatus status = placementPolicy.verifyBlockPlacement(
          locatedBlock.getLocations(), numOfDatanodes);
      assertTrue(status.isPlacementPolicySatisfied());
    }
  } finally {
    cluster.shutdown();
  }
}