Java 类org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair 实例源码

项目:hadoop    文件:BlockCommand.java   
/**
 * Create BlockCommand for transferring blocks to another datanode
 * @param blocktargetlist    blocks to be transferred 
 */
public BlockCommand(int action, String poolId,
    List<BlockTargetPair> blocktargetlist) {
  super(action);
  this.poolId = poolId;
  blocks = new Block[blocktargetlist.size()]; 
  targets = new DatanodeInfo[blocks.length][];
  targetStorageTypes = new StorageType[blocks.length][];
  targetStorageIDs = new String[blocks.length][];

  for(int i = 0; i < blocks.length; i++) {
    BlockTargetPair p = blocktargetlist.get(i);
    blocks[i] = p.block;
    targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets);
    targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets);
    targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets);
  }
}
项目:aliyun-oss-hadoop-fs    文件:BlockCommand.java   
/**
 * Create BlockCommand for transferring blocks to another datanode
 * @param blocktargetlist    blocks to be transferred 
 */
public BlockCommand(int action, String poolId,
    List<BlockTargetPair> blocktargetlist) {
  super(action);
  this.poolId = poolId;
  blocks = new Block[blocktargetlist.size()]; 
  targets = new DatanodeInfo[blocks.length][];
  targetStorageTypes = new StorageType[blocks.length][];
  targetStorageIDs = new String[blocks.length][];

  for(int i = 0; i < blocks.length; i++) {
    BlockTargetPair p = blocktargetlist.get(i);
    blocks[i] = p.block;
    targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets);
    targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets);
    targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets);
  }
}
项目:big-c    文件:BlockCommand.java   
/**
 * Create BlockCommand for transferring blocks to another datanode
 * @param blocktargetlist    blocks to be transferred 
 */
public BlockCommand(int action, String poolId,
    List<BlockTargetPair> blocktargetlist) {
  super(action);
  this.poolId = poolId;
  blocks = new Block[blocktargetlist.size()]; 
  targets = new DatanodeInfo[blocks.length][];
  targetStorageTypes = new StorageType[blocks.length][];
  targetStorageIDs = new String[blocks.length][];

  for(int i = 0; i < blocks.length; i++) {
    BlockTargetPair p = blocktargetlist.get(i);
    blocks[i] = p.block;
    targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets);
    targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets);
    targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets);
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:BlockCommand.java   
/**
 * Create BlockCommand for transferring blocks to another datanode
 * @param blocktargetlist    blocks to be transferred 
 */
public BlockCommand(int action, String poolId,
    List<BlockTargetPair> blocktargetlist) {
  super(action);
  this.poolId = poolId;
  blocks = new Block[blocktargetlist.size()]; 
  targets = new DatanodeInfo[blocks.length][];
  targetStorageTypes = new StorageType[blocks.length][];
  targetStorageIDs = new String[blocks.length][];

  for(int i = 0; i < blocks.length; i++) {
    BlockTargetPair p = blocktargetlist.get(i);
    blocks[i] = p.block;
    targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets);
    targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets);
    targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets);
  }
}
项目:FlexMap    文件:BlockCommand.java   
/**
 * Create BlockCommand for transferring blocks to another datanode
 * @param blocktargetlist    blocks to be transferred 
 */
public BlockCommand(int action, String poolId,
    List<BlockTargetPair> blocktargetlist) {
  super(action);
  this.poolId = poolId;
  blocks = new Block[blocktargetlist.size()]; 
  targets = new DatanodeInfo[blocks.length][];
  targetStorageTypes = new StorageType[blocks.length][];
  targetStorageIDs = new String[blocks.length][];

  for(int i = 0; i < blocks.length; i++) {
    BlockTargetPair p = blocktargetlist.get(i);
    blocks[i] = p.block;
    targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets);
    targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets);
    targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets);
  }
}
项目:hadoop-on-lustre2    文件:BlockCommand.java   
/**
 * Create BlockCommand for transferring blocks to another datanode
 * @param blocktargetlist    blocks to be transferred 
 */
public BlockCommand(int action, String poolId,
    List<BlockTargetPair> blocktargetlist) {
  super(action);
  this.poolId = poolId;
  blocks = new Block[blocktargetlist.size()]; 
  targets = new DatanodeInfo[blocks.length][];
  targetStorageIDs = new String[blocks.length][];

  for(int i = 0; i < blocks.length; i++) {
    BlockTargetPair p = blocktargetlist.get(i);
    blocks[i] = p.block;
    targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets);
    targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets);
  }
}
项目:hadoop    文件:TestBlockManager.java   
private DatanodeStorageInfo[] scheduleSingleReplication(Block block) {
  // list for priority 1
  List<Block> list_p1 = new ArrayList<Block>();
  list_p1.add(block);

  // list of lists for each priority
  List<List<Block>> list_all = new ArrayList<List<Block>>();
  list_all.add(new ArrayList<Block>()); // for priority 0
  list_all.add(list_p1); // for priority 1

  assertEquals("Block not initially pending replication", 0,
      bm.pendingReplications.getNumReplicas(block));
  assertEquals(
      "computeReplicationWork should indicate replication is needed", 1,
      bm.computeReplicationWorkForBlocks(list_all));
  assertTrue("replication is pending after work is computed",
      bm.pendingReplications.getNumReplicas(block) > 0);

  LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls = getAllPendingReplications();
  assertEquals(1, repls.size());
  Entry<DatanodeStorageInfo, BlockTargetPair> repl =
    repls.entries().iterator().next();

  DatanodeStorageInfo[] targets = repl.getValue().targets;

  DatanodeStorageInfo[] pipeline = new DatanodeStorageInfo[1 + targets.length];
  pipeline[0] = repl.getKey();
  System.arraycopy(targets, 0, pipeline, 1, targets.length);

  return pipeline;
}
项目:hadoop    文件:TestBlockManager.java   
private LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> getAllPendingReplications() {
  LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls =
    LinkedListMultimap.create();
  for (DatanodeDescriptor dn : nodes) {
    List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10);
    if (thisRepls != null) {
      for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
        repls.putAll(storage, thisRepls);
      }
    }
  }
  return repls;
}
项目:aliyun-oss-hadoop-fs    文件:TestBlockManager.java   
private DatanodeStorageInfo[] scheduleSingleReplication(BlockInfo block) {
  // list for priority 1
  List<BlockInfo> list_p1 = new ArrayList<>();
  list_p1.add(block);

  // list of lists for each priority
  List<List<BlockInfo>> list_all = new ArrayList<>();
  list_all.add(new ArrayList<BlockInfo>()); // for priority 0
  list_all.add(list_p1); // for priority 1

  assertEquals("Block not initially pending replication", 0,
      bm.pendingReplications.getNumReplicas(block));
  assertEquals(
      "computeBlockRecoveryWork should indicate replication is needed", 1,
      bm.computeRecoveryWorkForBlocks(list_all));
  assertTrue("replication is pending after work is computed",
      bm.pendingReplications.getNumReplicas(block) > 0);

  LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls = getAllPendingReplications();
  assertEquals(1, repls.size());
  Entry<DatanodeStorageInfo, BlockTargetPair> repl =
    repls.entries().iterator().next();

  DatanodeStorageInfo[] targets = repl.getValue().targets;

  DatanodeStorageInfo[] pipeline = new DatanodeStorageInfo[1 + targets.length];
  pipeline[0] = repl.getKey();
  System.arraycopy(targets, 0, pipeline, 1, targets.length);

  return pipeline;
}
项目:aliyun-oss-hadoop-fs    文件:TestBlockManager.java   
private LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> getAllPendingReplications() {
  LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls =
    LinkedListMultimap.create();
  for (DatanodeDescriptor dn : nodes) {
    List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10);
    if (thisRepls != null) {
      for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
        repls.putAll(storage, thisRepls);
      }
    }
  }
  return repls;
}
项目:big-c    文件:TestBlockManager.java   
private DatanodeStorageInfo[] scheduleSingleReplication(Block block) {
  // list for priority 1
  List<Block> list_p1 = new ArrayList<Block>();
  list_p1.add(block);

  // list of lists for each priority
  List<List<Block>> list_all = new ArrayList<List<Block>>();
  list_all.add(new ArrayList<Block>()); // for priority 0
  list_all.add(list_p1); // for priority 1

  assertEquals("Block not initially pending replication", 0,
      bm.pendingReplications.getNumReplicas(block));
  assertEquals(
      "computeReplicationWork should indicate replication is needed", 1,
      bm.computeReplicationWorkForBlocks(list_all));
  assertTrue("replication is pending after work is computed",
      bm.pendingReplications.getNumReplicas(block) > 0);

  LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls = getAllPendingReplications();
  assertEquals(1, repls.size());
  Entry<DatanodeStorageInfo, BlockTargetPair> repl =
    repls.entries().iterator().next();

  DatanodeStorageInfo[] targets = repl.getValue().targets;

  DatanodeStorageInfo[] pipeline = new DatanodeStorageInfo[1 + targets.length];
  pipeline[0] = repl.getKey();
  System.arraycopy(targets, 0, pipeline, 1, targets.length);

  return pipeline;
}
项目:big-c    文件:TestBlockManager.java   
private LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> getAllPendingReplications() {
  LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls =
    LinkedListMultimap.create();
  for (DatanodeDescriptor dn : nodes) {
    List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10);
    if (thisRepls != null) {
      for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
        repls.putAll(storage, thisRepls);
      }
    }
  }
  return repls;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestBlockManager.java   
private DatanodeStorageInfo[] scheduleSingleReplication(Block block) {
  // list for priority 1
  List<Block> list_p1 = new ArrayList<Block>();
  list_p1.add(block);

  // list of lists for each priority
  List<List<Block>> list_all = new ArrayList<List<Block>>();
  list_all.add(new ArrayList<Block>()); // for priority 0
  list_all.add(list_p1); // for priority 1

  assertEquals("Block not initially pending replication", 0,
      bm.pendingReplications.getNumReplicas(block));
  assertEquals(
      "computeReplicationWork should indicate replication is needed", 1,
      bm.computeReplicationWorkForBlocks(list_all));
  assertTrue("replication is pending after work is computed",
      bm.pendingReplications.getNumReplicas(block) > 0);

  LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls = getAllPendingReplications();
  assertEquals(1, repls.size());
  Entry<DatanodeStorageInfo, BlockTargetPair> repl =
    repls.entries().iterator().next();

  DatanodeStorageInfo[] targets = repl.getValue().targets;

  DatanodeStorageInfo[] pipeline = new DatanodeStorageInfo[1 + targets.length];
  pipeline[0] = repl.getKey();
  System.arraycopy(targets, 0, pipeline, 1, targets.length);

  return pipeline;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestBlockManager.java   
private LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> getAllPendingReplications() {
  LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls =
    LinkedListMultimap.create();
  for (DatanodeDescriptor dn : nodes) {
    List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10);
    if (thisRepls != null) {
      for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
        repls.putAll(storage, thisRepls);
      }
    }
  }
  return repls;
}
项目:hadoop-plus    文件:BlockCommand.java   
/**
 * Create BlockCommand for transferring blocks to another datanode
 * @param blocktargetlist    blocks to be transferred 
 */
public BlockCommand(int action, String poolId,
    List<BlockTargetPair> blocktargetlist) {
  super(action);
  this.poolId = poolId;
  blocks = new Block[blocktargetlist.size()]; 
  targets = new DatanodeInfo[blocks.length][];
  for(int i = 0; i < blocks.length; i++) {
    BlockTargetPair p = blocktargetlist.get(i);
    blocks[i] = p.block;
    targets[i] = p.targets;
  }
}
项目:hadoop-plus    文件:TestBlockManager.java   
private DatanodeDescriptor[] scheduleSingleReplication(Block block) {
  // list for priority 1
  List<Block> list_p1 = new ArrayList<Block>();
  list_p1.add(block);

  // list of lists for each priority
  List<List<Block>> list_all = new ArrayList<List<Block>>();
  list_all.add(new ArrayList<Block>()); // for priority 0
  list_all.add(list_p1); // for priority 1

  assertEquals("Block not initially pending replication", 0,
      bm.pendingReplications.getNumReplicas(block));
  assertEquals(
      "computeReplicationWork should indicate replication is needed", 1,
      bm.computeReplicationWorkForBlocks(list_all));
  assertTrue("replication is pending after work is computed",
      bm.pendingReplications.getNumReplicas(block) > 0);

  LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls = getAllPendingReplications();
  assertEquals(1, repls.size());
  Entry<DatanodeDescriptor, BlockTargetPair> repl =
    repls.entries().iterator().next();

  DatanodeDescriptor[] targets = repl.getValue().targets;

  DatanodeDescriptor[] pipeline = new DatanodeDescriptor[1 + targets.length];
  pipeline[0] = repl.getKey();
  System.arraycopy(targets, 0, pipeline, 1, targets.length);

  return pipeline;
}
项目:hadoop-plus    文件:TestBlockManager.java   
private LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> getAllPendingReplications() {
  LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls =
    LinkedListMultimap.create();
  for (DatanodeDescriptor dn : nodes) {
    List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10);
    if (thisRepls != null) {
      repls.putAll(dn, thisRepls);
    }
  }
  return repls;
}
项目:FlexMap    文件:TestBlockManager.java   
private DatanodeStorageInfo[] scheduleSingleReplication(Block block) {
  // list for priority 1
  List<Block> list_p1 = new ArrayList<Block>();
  list_p1.add(block);

  // list of lists for each priority
  List<List<Block>> list_all = new ArrayList<List<Block>>();
  list_all.add(new ArrayList<Block>()); // for priority 0
  list_all.add(list_p1); // for priority 1

  assertEquals("Block not initially pending replication", 0,
      bm.pendingReplications.getNumReplicas(block));
  assertEquals(
      "computeReplicationWork should indicate replication is needed", 1,
      bm.computeReplicationWorkForBlocks(list_all));
  assertTrue("replication is pending after work is computed",
      bm.pendingReplications.getNumReplicas(block) > 0);

  LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls = getAllPendingReplications();
  assertEquals(1, repls.size());
  Entry<DatanodeStorageInfo, BlockTargetPair> repl =
    repls.entries().iterator().next();

  DatanodeStorageInfo[] targets = repl.getValue().targets;

  DatanodeStorageInfo[] pipeline = new DatanodeStorageInfo[1 + targets.length];
  pipeline[0] = repl.getKey();
  System.arraycopy(targets, 0, pipeline, 1, targets.length);

  return pipeline;
}
项目:FlexMap    文件:TestBlockManager.java   
private LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> getAllPendingReplications() {
  LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls =
    LinkedListMultimap.create();
  for (DatanodeDescriptor dn : nodes) {
    List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10);
    if (thisRepls != null) {
      for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
        repls.putAll(storage, thisRepls);
      }
    }
  }
  return repls;
}
项目:hops    文件:BlockCommand.java   
/**
 * Create BlockCommand for transferring blocks to another datanode
 *
 * @param blocktargetlist
 *     blocks to be transferred
 */
public BlockCommand(int action, String poolId,
    List<BlockTargetPair> blocktargetlist) {
  super(action);
  this.poolId = poolId;
  blocks = new Block[blocktargetlist.size()];
  targets = new DatanodeInfo[blocks.length][];
  for (int i = 0; i < blocks.length; i++) {
    BlockTargetPair p = blocktargetlist.get(i);
    blocks[i] = p.block;
    targets[i] = p.targets;
  }
}
项目:hops    文件:TestBlockManager.java   
private LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> getAllPendingReplications() {
  LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls =
      LinkedListMultimap.create();
  for (DatanodeDescriptor dn : nodes) {
    List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10);
    if (thisRepls != null) {
      repls.putAll(dn, thisRepls);
    }
  }
  return repls;
}
项目:hadoop-TCP    文件:BlockCommand.java   
/**
 * Create BlockCommand for transferring blocks to another datanode
 * @param blocktargetlist    blocks to be transferred 
 */
public BlockCommand(int action, String poolId,
    List<BlockTargetPair> blocktargetlist) {
  super(action);
  this.poolId = poolId;
  blocks = new Block[blocktargetlist.size()]; 
  targets = new DatanodeInfo[blocks.length][];
  for(int i = 0; i < blocks.length; i++) {
    BlockTargetPair p = blocktargetlist.get(i);
    blocks[i] = p.block;
    targets[i] = p.targets;
  }
}
项目:hadoop-TCP    文件:TestBlockManager.java   
private DatanodeDescriptor[] scheduleSingleReplication(Block block) {
  // list for priority 1
  List<Block> list_p1 = new ArrayList<Block>();
  list_p1.add(block);

  // list of lists for each priority
  List<List<Block>> list_all = new ArrayList<List<Block>>();
  list_all.add(new ArrayList<Block>()); // for priority 0
  list_all.add(list_p1); // for priority 1

  assertEquals("Block not initially pending replication", 0,
      bm.pendingReplications.getNumReplicas(block));
  assertEquals(
      "computeReplicationWork should indicate replication is needed", 1,
      bm.computeReplicationWorkForBlocks(list_all));
  assertTrue("replication is pending after work is computed",
      bm.pendingReplications.getNumReplicas(block) > 0);

  LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls = getAllPendingReplications();
  assertEquals(1, repls.size());
  Entry<DatanodeDescriptor, BlockTargetPair> repl =
    repls.entries().iterator().next();

  DatanodeDescriptor[] targets = repl.getValue().targets;

  DatanodeDescriptor[] pipeline = new DatanodeDescriptor[1 + targets.length];
  pipeline[0] = repl.getKey();
  System.arraycopy(targets, 0, pipeline, 1, targets.length);

  return pipeline;
}
项目:hadoop-TCP    文件:TestBlockManager.java   
private LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> getAllPendingReplications() {
  LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls =
    LinkedListMultimap.create();
  for (DatanodeDescriptor dn : nodes) {
    List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10);
    if (thisRepls != null) {
      repls.putAll(dn, thisRepls);
    }
  }
  return repls;
}
项目:hardfs    文件:BlockCommand.java   
/**
 * Create BlockCommand for transferring blocks to another datanode
 * @param blocktargetlist    blocks to be transferred 
 */
public BlockCommand(int action, String poolId,
    List<BlockTargetPair> blocktargetlist) {
  super(action);
  this.poolId = poolId;
  blocks = new Block[blocktargetlist.size()]; 
  targets = new DatanodeInfo[blocks.length][];
  for(int i = 0; i < blocks.length; i++) {
    BlockTargetPair p = blocktargetlist.get(i);
    blocks[i] = p.block;
    targets[i] = p.targets;
  }
}
项目:hardfs    文件:TestBlockManager.java   
private DatanodeDescriptor[] scheduleSingleReplication(Block block) {
  // list for priority 1
  List<Block> list_p1 = new ArrayList<Block>();
  list_p1.add(block);

  // list of lists for each priority
  List<List<Block>> list_all = new ArrayList<List<Block>>();
  list_all.add(new ArrayList<Block>()); // for priority 0
  list_all.add(list_p1); // for priority 1

  assertEquals("Block not initially pending replication", 0,
      bm.pendingReplications.getNumReplicas(block));
  assertEquals(
      "computeReplicationWork should indicate replication is needed", 1,
      bm.computeReplicationWorkForBlocks(list_all));
  assertTrue("replication is pending after work is computed",
      bm.pendingReplications.getNumReplicas(block) > 0);

  LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls = getAllPendingReplications();
  assertEquals(1, repls.size());
  Entry<DatanodeDescriptor, BlockTargetPair> repl =
    repls.entries().iterator().next();

  DatanodeDescriptor[] targets = repl.getValue().targets;

  DatanodeDescriptor[] pipeline = new DatanodeDescriptor[1 + targets.length];
  pipeline[0] = repl.getKey();
  System.arraycopy(targets, 0, pipeline, 1, targets.length);

  return pipeline;
}
项目:hardfs    文件:TestBlockManager.java   
private LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> getAllPendingReplications() {
  LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls =
    LinkedListMultimap.create();
  for (DatanodeDescriptor dn : nodes) {
    List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10);
    if (thisRepls != null) {
      repls.putAll(dn, thisRepls);
    }
  }
  return repls;
}
项目:hadoop-on-lustre2    文件:TestBlockManager.java   
private DatanodeStorageInfo[] scheduleSingleReplication(Block block) {
  // list for priority 1
  List<Block> list_p1 = new ArrayList<Block>();
  list_p1.add(block);

  // list of lists for each priority
  List<List<Block>> list_all = new ArrayList<List<Block>>();
  list_all.add(new ArrayList<Block>()); // for priority 0
  list_all.add(list_p1); // for priority 1

  assertEquals("Block not initially pending replication", 0,
      bm.pendingReplications.getNumReplicas(block));
  assertEquals(
      "computeReplicationWork should indicate replication is needed", 1,
      bm.computeReplicationWorkForBlocks(list_all));
  assertTrue("replication is pending after work is computed",
      bm.pendingReplications.getNumReplicas(block) > 0);

  LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls = getAllPendingReplications();
  assertEquals(1, repls.size());
  Entry<DatanodeStorageInfo, BlockTargetPair> repl =
    repls.entries().iterator().next();

  DatanodeStorageInfo[] targets = repl.getValue().targets;

  DatanodeStorageInfo[] pipeline = new DatanodeStorageInfo[1 + targets.length];
  pipeline[0] = repl.getKey();
  System.arraycopy(targets, 0, pipeline, 1, targets.length);

  return pipeline;
}
项目:hadoop-on-lustre2    文件:TestBlockManager.java   
private LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> getAllPendingReplications() {
  LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls =
    LinkedListMultimap.create();
  for (DatanodeDescriptor dn : nodes) {
    List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10);
    if (thisRepls != null) {
      for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
        repls.putAll(storage, thisRepls);
      }
    }
  }
  return repls;
}
项目:aliyun-oss-hadoop-fs    文件:DatanodeManager.java   
/** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
    StorageReport[] reports, final String blockPoolId,
    long cacheCapacity, long cacheUsed, int xceiverCount, 
    int maxTransfers, int failedVolumes,
    VolumeFailureSummary volumeFailureSummary) throws IOException {
  final DatanodeDescriptor nodeinfo;
  try {
    nodeinfo = getDatanode(nodeReg);
  } catch (UnregisteredNodeException e) {
    return new DatanodeCommand[]{RegisterCommand.REGISTER};
  }

  // Check if this datanode should actually be shutdown instead.
  if (nodeinfo != null && nodeinfo.isDisallowed()) {
    setDatanodeDead(nodeinfo);
    throw new DisallowedDatanodeException(nodeinfo);
  }

  if (nodeinfo == null || !nodeinfo.isRegistered()) {
    return new DatanodeCommand[]{RegisterCommand.REGISTER};
  }
  heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
      cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);

  // If we are in safemode, do not send back any recovery / replication
  // requests. Don't even drain the existing queue of work.
  if (namesystem.isInSafeMode()) {
    return new DatanodeCommand[0];
  }

  // block recovery command
  final BlockRecoveryCommand brCommand = getBlockRecoveryCommand(blockPoolId,
      nodeinfo);
  if (brCommand != null) {
    return new DatanodeCommand[]{brCommand};
  }

  final List<DatanodeCommand> cmds = new ArrayList<>();
  // check pending replication
  List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
      maxTransfers);
  if (pendingList != null) {
    cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
        pendingList));
  }
  // check pending erasure coding tasks
  List<BlockECRecoveryInfo> pendingECList = nodeinfo.getErasureCodeCommand(
      maxTransfers);
  if (pendingECList != null) {
    cmds.add(new BlockECRecoveryCommand(DNA_ERASURE_CODING_RECOVERY,
        pendingECList));
  }
  // check block invalidation
  Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
  if (blks != null) {
    cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId,
        blks));
  }
  // cache commands
  addCacheCommands(blockPoolId, nodeinfo, cmds);
  // key update command
  blockManager.addKeyUpdateCommand(cmds, nodeinfo);

  // check for balancer bandwidth update
  if (nodeinfo.getBalancerBandwidth() > 0) {
    cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
    // set back to 0 to indicate that datanode has been sent the new value
    nodeinfo.setBalancerBandwidth(0);
  }

  if (!cmds.isEmpty()) {
    return cmds.toArray(new DatanodeCommand[cmds.size()]);
  }

  return new DatanodeCommand[0];
}
项目:hops    文件:DatanodeManager.java   
/**
 * Handle heartbeat from datanodes.
 */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
    final String blockPoolId, long capacity, long dfsUsed, long remaining,
    long blockPoolUsed, int xceiverCount, int maxTransfers, int failedVolumes)
    throws IOException {
  synchronized (heartbeatManager) {
    synchronized (datanodeMap) {
      DatanodeDescriptor nodeinfo = null;
      try {
        nodeinfo = getDatanode(nodeReg);
      } catch (UnregisteredNodeException e) {
        return new DatanodeCommand[]{RegisterCommand.REGISTER};
      }

      // Check if this datanode should actually be shutdown instead. 
      if (nodeinfo != null && nodeinfo.isDisallowed()) {
        setDatanodeDead(nodeinfo);
        throw new DisallowedDatanodeException(nodeinfo);
      }

      if (nodeinfo == null || !nodeinfo.isAlive) {
        return new DatanodeCommand[]{RegisterCommand.REGISTER};
      }

      heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed, remaining,
          blockPoolUsed, xceiverCount, failedVolumes);

      //check lease recovery
      BlockInfoUnderConstruction[] blocks =
          nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
      if (blocks != null) {
        BlockRecoveryCommand brCommand =
            new BlockRecoveryCommand(blocks.length);
        for (BlockInfoUnderConstruction b : blocks) {
          brCommand.add(new RecoveringBlock(new ExtendedBlock(blockPoolId, b),
              getDataNodeDescriptorsTx(b), b.getBlockRecoveryId()));
        }
        return new DatanodeCommand[]{brCommand};
      }

      final List<DatanodeCommand> cmds = new ArrayList<>();
      //check pending replication
      List<BlockTargetPair> pendingList =
          nodeinfo.getReplicationCommand(maxTransfers);
      if (pendingList != null) {
        cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
            pendingList));
      }
      //check block invalidation
      Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
      if (blks != null) {
        cmds.add(
            new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId,
                blks));
      }

      blockManager.addKeyUpdateCommand(cmds, nodeinfo);

      // check for balancer bandwidth update
      if (nodeinfo.getBalancerBandwidth() > 0) {
        cmds.add(
            new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
        // set back to 0 to indicate that datanode has been sent the new value
        nodeinfo.setBalancerBandwidth(0);
      }

      if (!cmds.isEmpty()) {
        return cmds.toArray(new DatanodeCommand[cmds.size()]);
      }
    }
  }

  return new DatanodeCommand[0];
}