Java 类org.apache.hadoop.hdfs.server.protocol.BlockReport 实例源码

项目:hadoop-EAR    文件:AvatarNode.java   
public DatanodeCommand blockReportNew(DatanodeRegistration nodeReg, BlockReport rep) throws IOException {
  if (runInfo.shutdown || !runInfo.isRunning) {
    return null;
  }
  if (ignoreDatanodes()) {
    LOG.info("Standby fell behind. Telling " + nodeReg.toString() +
              " to back off");
    // Do not process block reports yet as the ingest thread is catching up
    return AvatarDatanodeCommand.BACKOFF;
  }

  if (currentAvatar == Avatar.STANDBY) {
    Collection<Block> failed = super.blockReportWithRetries(nodeReg, rep);

    // standby should send only DNA_RETRY
    BlockCommand bCmd = new BlockCommand(DatanodeProtocols.DNA_RETRY,
        failed.toArray(new Block[failed.size()]));
    return bCmd;
  } else {
    // only the primary can send DNA_FINALIZE
    return super.blockReport(nodeReg, rep);
  }
}
项目:hops    文件:BlockManager.java   
private HashMatchingResult calculateMismatchedHashes(DatanodeDescriptor dn,
    BlockReport report) throws IOException {
  List<HashBucket> allMachineHashes = HashBuckets.getInstance()
      .getBucketsForDatanode(dn);
  List<Integer> matchedBuckets = new ArrayList<>();
  List<Integer> mismatchedBuckets = new ArrayList<>();

  for (int i = 0; i < report.getBuckets().length; i++){
    boolean matched = false;
    for (HashBucket bucket : allMachineHashes){
      if (bucket.getBucketId() == i && bucket.getHash() == report
          .getHashes()[i]){
        matched = true;
        break;
      }
    }
    if (matched){
      matchedBuckets.add(i);
    } else {
      mismatchedBuckets.add(i);
    }
  }

  return new HashMatchingResult(matchedBuckets, mismatchedBuckets);
}
项目:hops    文件:NNThroughputBenchmark.java   
void register() throws IOException {
  // get versions from the namenode
  nsInfo = nameNodeProto.versionRequest();
  dnRegistration = new DatanodeRegistration(
      new DatanodeID(DNS.getDefaultIP("default"),
          DNS.getDefaultHost("default", "default"), "", getNodePort(dnIdx),
          DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
          DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT),
      new DataStorage(nsInfo, ""), new ExportedBlockKeys(),
      VersionInfo.getVersion());
  DataNode.setNewStorageID(dnRegistration);
  // register datanode
  dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
  //first block reports
  storage = new DatanodeStorage(dnRegistration.getStorageID());
  final StorageBlockReport[] reports = {new StorageBlockReport(storage,
      BlockReport.builder(NUM_BUCKETS).build())};
  nameNodeProto.blockReport(dnRegistration,
      nameNode.getNamesystem().getBlockPoolId(), reports);
}
项目:hops    文件:TestBlockManager.java   
@Test
public void testSafeModeIBRAfterIncremental() throws Exception {
  DatanodeDescriptor node = spy(nodes.get(0));
  node.setStorageID("dummy-storage");
  node.isAlive = true;

  DatanodeRegistration nodeReg =
      new DatanodeRegistration(node, null, null, "");

  // pretend to be in safemode
  doReturn(true).when(fsn).isInStartupSafeMode();

  // register new node
  bm.getDatanodeManager().registerDatanode(nodeReg);
  bm.getDatanodeManager().addDatanode(node); // swap in spy    
  assertEquals(node, bm.getDatanodeManager().getDatanode(node));
  assertTrue(node.isFirstBlockReport());
  // send block report while pretending to already have blocks
  reset(node);
  doReturn(1).when(node).numBlocks();
  bm.processReport(node, "pool", BlockReport.builder(numBuckets).build());
  verify(node).receivedBlockReport();
  assertFalse(node.isFirstBlockReport());
}
项目:hops    文件:TestBlockReport.java   
/**
 * Test creates a file and closes it.
 * The second datanode is started in the cluster.
 * As soon as the replication process is completed test runs
 * Block report and checks that no underreplicated blocks are left
 *
 * @throws IOException
 *     in case of an error
 */
@Test
public void blockReport_06() throws Exception {
  final String METHOD_NAME = GenericTestUtils.getMethodName();
  Path filePath = new Path("/" + METHOD_NAME + ".dat");
  final int DN_N1 = DN_N0 + 1;

  ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
  startDNandWait(filePath, true);

  // all blocks belong to the same file, hence same BP
  DataNode dn = cluster.getDataNodes().get(DN_N1);
  String poolId = cluster.getNamesystem().getBlockPoolId();
  DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
  StorageBlockReport[] report =
      {new StorageBlockReport(new DatanodeStorage(dnR.getStorageID()),
          BlockReport.builder(NUM_BUCKETS).addAllAsFinalized(blocks).build())};
  cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
  printStats();
  Thread.sleep(10000); //HOP: wait for the replication monitor to catch up
  assertEquals("Wrong number of PendingReplication Blocks", 0,
      cluster.getNamesystem().getUnderReplicatedBlocks());
}
项目:RDFS    文件:AvatarNode.java   
public DatanodeCommand blockReportNew(DatanodeRegistration nodeReg, BlockReport rep) throws IOException {
  if (runInfo.shutdown || !runInfo.isRunning) {
    return null;
  }
  if (ignoreDatanodes()) {
    LOG.info("Standby fell behind. Telling " + nodeReg.toString() +
              " to back off");
    // Do not process block reports yet as the ingest thread is catching up
    return AvatarDatanodeCommand.BACKOFF;
  }

  if (currentAvatar == Avatar.STANDBY) {
    Collection<Block> failed = super.blockReportWithRetries(nodeReg, rep);

    BlockCommand bCmd = new BlockCommand(DatanodeProtocols.DNA_RETRY,
        failed.toArray(new Block[failed.size()]));
    return bCmd;
  } else {
    return super.blockReport(nodeReg, rep);
  }
}
项目:hadoop-EAR    文件:NameNode.java   
/**
* add new replica blocks to the Inode to target mapping
* also add the Inode file to DataNodeDesc
*/
public void blocksBeingWrittenReport(DatanodeRegistration nodeReg,
    BlockReport blocks) throws IOException {
  verifyRequest(nodeReg);
  long[] blocksAsLong = blocks.getBlockReportInLongs();
  BlockListAsLongs blist = new BlockListAsLongs(blocksAsLong);
  boolean processed = namesystem.processBlocksBeingWrittenReport(nodeReg, blist);

  String message = "*BLOCK* NameNode.blocksBeingWrittenReport: "
      +"from "+nodeReg.getName()+" "+blist.getNumberOfBlocks() +" blocks";
  if (!processed) {
    message += " was discarded.";
  }
  stateChangeLog.info(message);
}
项目:hadoop-EAR    文件:NameNode.java   
protected Collection<Block> blockReportWithRetries(
    DatanodeRegistration nodeReg, BlockReport blocks) throws IOException {
  verifyRequest(nodeReg);
  myMetrics.numBlockReport.inc();
  BlockListAsLongs blist =
    new BlockListAsLongs(blocks.getBlockReportInLongs());
  stateChangeLog.debug("*BLOCK* NameNode.blockReport: " + "from "
      + nodeReg.getName() + " " + blist.getNumberOfBlocks() + " blocks");

  return namesystem.processReport(nodeReg, blist);
}
项目:hadoop-EAR    文件:DataNode.java   
/**
 * Sends a 'Blocks Being Written' report to the given node.
 *
 * @param node the node to send the report to
 * @throws IOException
 */
public void sendBlocksBeingWrittenReport(DatanodeProtocol node,
    int namespaceId, DatanodeRegistration nsRegistration) throws IOException {
  Block[] blocks = data.getBlocksBeingWrittenReport(namespaceId);
  if (blocks != null && blocks.length != 0) {
    long[] blocksAsLong =
      BlockListAsLongs.convertToArrayLongs(blocks);
    BlockReport bbwReport = new BlockReport(blocksAsLong);
    node.blocksBeingWrittenReport(nsRegistration, bbwReport);
  }
}
项目:hops    文件:NameNodeRpcServer.java   
@Override // DatanodeProtocol
public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
    String poolId, StorageBlockReport[] reports) throws IOException {
  verifyRequest(nodeReg);

  BlockReport blist = reports[0].getReport(); // Assume no federation '0'
  if (blockStateChangeLog.isDebugEnabled()) {
    blockStateChangeLog.debug(
        "*BLOCK* NameNode.blockReport: " + "from " + nodeReg + " " +
            blist.getNumBlocks() + " blocks");
  }

  namesystem.getBlockManager().processReport(nodeReg, poolId, blist);
  return new FinalizeCommand(poolId);
}
项目:hops    文件:HashBuckets.java   
public void applyHash(int storageId, HdfsServerConstants.ReplicaState state,
    Block block ) throws TransactionContextException, StorageException {
  int bucketId = getBucketForBlock(block);
  HashBucket bucket = getBucket(storageId, bucketId);


  long newHash = bucket.getHash() + BlockReport.hash(block, state);
  LOG.debug("Applying block:" + blockToString
      (block) + "sid: " + storageId + "state: " + state.name() + ", hash: "
      + BlockReport.hash(block, state));

  bucket.setHash(newHash);
}
项目:hops    文件:HashBuckets.java   
public void undoHash(int storageId, HdfsServerConstants.ReplicaState
    state, Block block) throws TransactionContextException, StorageException {
  int bucketId = getBucketForBlock(block);
  HashBucket bucket = getBucket(storageId, bucketId);
  long newHash = bucket.getHash() - BlockReport.hash(block, state);
  LOG.debug("Undo block:" + blockToString
      (block) + "sid: " + storageId + "state: " + state.name() + ", hash: " +
      BlockReport.hash(block,state));

  bucket.setHash(newHash);
}
项目:hops    文件:FsDatasetImpl.java   
/**
 * Generates a block report from the in-memory block map.
 */
@Override // FsDatasetSpi
public BlockReport getBlockReport(String bpid) {
  int size = volumeMap.size(bpid);
  BlockReport.Builder builder = BlockReport.builder(NUM_BUCKETS);
  if (size == 0) {
    return builder.build();
  }

  synchronized (this) {
    for (ReplicaInfo b : volumeMap.replicas(bpid)) {
      switch (b.getState()) {
        case FINALIZED:
        case RBW:
        case RWR:
          builder.add(b);
          break;
        case RUR:
          ReplicaUnderRecovery rur = (ReplicaUnderRecovery) b;
          builder.add(rur.getOriginalReplica());
          break;
        case TEMPORARY:
          break;
        default:
          assert false : "Illegal ReplicaInfo state.";
      }
    }
    return builder.build();
  }
}
项目:hops    文件:PBHelper.java   
public static DatanodeProtocolProtos.BlockReportProto convert(BlockReport report) {

  List<DatanodeProtocolProtos.BlockReportBucketProto> bucketProtos = new
      ArrayList<>();
  for (BlockReportBucket bucket : report.getBuckets()){

    DatanodeProtocolProtos.BlockReportBucketProto.Builder bucketBuilder =
        DatanodeProtocolProtos.BlockReportBucketProto.newBuilder();
    for (BlockReportBlock block : bucket.getBlocks()){
      bucketBuilder.addBlocks(
          DatanodeProtocolProtos.BlockReportBlockProto.newBuilder()
              .setBlockId(block.getBlockId())
              .setGenerationStamp(block.getGenerationStamp())
              .setLength(block.getLength())
              .setState(convert(block.getState())));
    }
    bucketProtos.add(bucketBuilder.build());
  }

  List<Long> hashes = new ArrayList<>();
  for (long hash : report.getHashes()){
    hashes.add(hash);
  }

  return DatanodeProtocolProtos.BlockReportProto.newBuilder()
      .addAllBuckets(bucketProtos)
      .addAllHashes(hashes)
      .build();
}
项目:hops    文件:PBHelper.java   
public static BlockReport convert(
    DatanodeProtocolProtos.BlockReportProto blockReportProto) {
  int numBuckets = blockReportProto.getBucketsCount();

  BlockReportBucket[] buckets = new BlockReportBucket[numBuckets];
  long[] hashes = new long[numBuckets];
  int numBlocks = 0;

  for(int i = 0; i < numBuckets ; i ++){
    DatanodeProtocolProtos.BlockReportBucketProto bucketProto = blockReportProto.getBuckets(i);
    int numBlocksInBucket = bucketProto.getBlocksCount();

    numBlocks += numBlocksInBucket;

    BlockReportBlock[] blocks = new BlockReportBlock[numBlocksInBucket];
    for (int j = 0; j < numBlocksInBucket; j++){
      DatanodeProtocolProtos.BlockReportBlockProto blockProto = bucketProto.getBlocks(j);
      blocks[j] = new BlockReportBlock(blockProto.getBlockId(), blockProto
          .getGenerationStamp(), blockProto.getLength(), convert(blockProto
          .getState()));
    }

    BlockReportBucket bucket = new BlockReportBucket();
    bucket.setBlocks(blocks);
    buckets[i] = bucket;
    hashes[i] = blockReportProto.getHashes(i);
  }

  return new BlockReport(buckets, hashes, numBlocks);
}
项目:hops    文件:MiniDFSCluster.java   
/**
 * @return block reports from all data nodes
 * BlockListAsLongs is indexed in the same order as the list of datanodes
 * returned by getDataNodes()
 */
public Iterable<BlockReportBlock>[] getAllBlockReports(String bpid) {
  int numDataNodes = dataNodes.size();
  Iterable<BlockReportBlock>[] result = new BlockReport[numDataNodes];
  for (int i = 0; i < numDataNodes; ++i) {
    result[i] = getBlockReport(bpid, i);
  }
  return result;
}
项目:hops    文件:NNThroughputBenchmark.java   
void formBlockReport() {
  // fill remaining slots with blocks that do not exist
  for (int idx = blocks.size() - 1; idx >= nrBlocks; idx--) {
    blocks.set(idx, new Block(blocks.size() - idx, 0, 0));
  }
  blockReportList = BlockReport.builder(NUM_BUCKETS).addAllAsFinalized
      (blocks).build();
}
项目:hops    文件:TestBlockManager.java   
@Test
public void testSafeModeIBR() throws Exception {
  DatanodeDescriptor node = spy(nodes.get(0));
  node.setStorageID("dummy-storage");
  node.isAlive = true;

  DatanodeRegistration nodeReg =
      new DatanodeRegistration(node, null, null, "");

  // pretend to be in safemode
  doReturn(true).when(fsn).isInStartupSafeMode();

  // register new node
  bm.getDatanodeManager().registerDatanode(nodeReg);
  bm.getDatanodeManager().addDatanode(node); // swap in spy    
  assertEquals(node, bm.getDatanodeManager().getDatanode(node));
  assertTrue(node.isFirstBlockReport());
  // send block report, should be processed
  reset(node);
  bm.processReport(node, "pool", BlockReport.builder(numBuckets).build());
  verify(node).receivedBlockReport();
  assertFalse(node.isFirstBlockReport());
  // send block report again, should NOT be processed
  reset(node);
  bm.processReport(node, "pool", BlockReport.builder(numBuckets).build());
  verify(node, never()).receivedBlockReport();
  assertFalse(node.isFirstBlockReport());

  // re-register as if node restarted, should update existing node
  bm.getDatanodeManager().removeDatanode(node);
  reset(node);
  bm.getDatanodeManager().registerDatanode(nodeReg);
  verify(node).updateRegInfo(nodeReg);
  assertTrue(node.isFirstBlockReport()); // ready for report again
  // send block report, should be processed after restart
  reset(node);
  bm.processReport(node, "pool", BlockReport.builder(numBuckets).build());
  verify(node).receivedBlockReport();
  assertFalse(node.isFirstBlockReport());
}
项目:hops    文件:SimulatedFSDataset.java   
@Override
public synchronized BlockReport getBlockReport(String bpid) {
  final List<Block> blocks = new ArrayList<>();
  final Map<Block, BInfo> map = blockMap.get(bpid);
  BlockReport.Builder builder = BlockReport.builder(NUM_BUCKETS);
  if (map != null) {
    for (BInfo b : map.values()) {
      if (b.isFinalized()) {
        builder.addAsFinalized(b.theBlock);
      }
    }
  }
  return builder.build();
}
项目:hops    文件:TestSimulatedFSDataset.java   
@Test
public void testGetBlockReport() throws IOException {
  SimulatedFSDataset fsdataset = getSimulatedFSDataset();
  BlockReport blockReport = fsdataset.getBlockReport(bpid);
  assertEquals(0, blockReport.getNumBlocks());
  addSomeBlocks(fsdataset);
  blockReport = fsdataset.getBlockReport(bpid);
  assertEquals(NUMBLOCKS, blockReport.getNumBlocks());
  for (BlockReportBlock b : blockReport) {
    assertNotNull(b);
    assertEquals(blockIdToLen(b.getBlockId()), b.getLength());
  }
}
项目:hops    文件:TestBlockReport.java   
/**
 * Test writes a file and closes it. Then test finds a block
 * and changes its GS to be < of original one.
 * New empty block is added to the list of blocks.
 * Block report is forced and the check for # of corrupted blocks is
 * performed.
 *
 * @throws IOException
 *     in case of an error
 */
@Test
public void blockReport_03() throws IOException {
  final String METHOD_NAME = GenericTestUtils.getMethodName();
  Path filePath = new Path("/" + METHOD_NAME + ".dat");

  ArrayList<Block> blocks = prepareForRide(filePath, METHOD_NAME, FILE_SIZE);

  // The block with modified GS won't be found. Has to be deleted
  blocks.get(0).setGenerationStampNoPersistance(rand.nextLong());
  // This new block is unknown to NN and will be mark for deletion.
  blocks.add(new Block());

  // all blocks belong to the same file, hence same BP
  DataNode dn = cluster.getDataNodes().get(DN_N0);
  String poolId = cluster.getNamesystem().getBlockPoolId();
  DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
  StorageBlockReport[] report =
      {new StorageBlockReport(new DatanodeStorage(dnR.getStorageID()),
          BlockReport.builder(NUM_BUCKETS).addAllAsFinalized(blocks).build())};
  DatanodeCommand dnCmd =
      cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
  if (LOG.isDebugEnabled()) {
    LOG.debug("Got the command: " + dnCmd);
  }
  printStats();

  assertEquals(
      "Wrong number of CorruptedReplica+PendingDeletion " + "blocks is found",
      2, cluster.getNamesystem().getCorruptReplicaBlocks() +
          cluster.getNamesystem().getPendingDeletionBlocks());
}
项目:hops    文件:TestBlockReport.java   
@Test
public void blockReportRegrssion() throws IOException {
  final String METHOD_NAME = GenericTestUtils.getMethodName();


  ArrayList<Block> blocks = new ArrayList<>();

  for(int i = 0 ; i < 3; i++){
    Path filePath = new Path("/" + METHOD_NAME +i+ ".dat");
    DFSTestUtil
      .createFile(fs, filePath, FILE_SIZE, REPL_FACTOR, rand.nextLong());
    blocks.addAll(locatedToBlocks(cluster.getNameNodeRpc()
      .getBlockLocations(filePath.toString(), FILE_START, FILE_SIZE)
      .getLocatedBlocks(), null));
  }


  if (LOG.isDebugEnabled()) {
    LOG.debug("Number of blocks allocated " + blocks.size());
  }

  // all blocks belong to the same file, hence same BP
  DataNode dn = cluster.getDataNodes().get(DN_N0);
  String poolId = cluster.getNamesystem().getBlockPoolId();
  DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
  StorageBlockReport[] report =
      {new StorageBlockReport(new DatanodeStorage(dnR.getStorageID()),
          BlockReport.builder(NUM_BUCKETS).addAllAsFinalized(blocks).build())};
  try{
  cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
  }catch(Exception e){
    fail("No exception was expected. Get "+e);
  }
}
项目:RDFS    文件:NameNode.java   
/**
* add new replica blocks to the Inode to target mapping
* also add the Inode file to DataNodeDesc
*/
public void blocksBeingWrittenReport(DatanodeRegistration nodeReg,
    BlockReport blocks) throws IOException {
  verifyRequest(nodeReg);
  long[] blocksAsLong = blocks.getBlockReportInLongs();
  BlockListAsLongs blist = new BlockListAsLongs(blocksAsLong);
  namesystem.processBlocksBeingWrittenReport(nodeReg, blist);

  stateChangeLog.info("*BLOCK* NameNode.blocksBeingWrittenReport: "
      +"from "+nodeReg.getName()+" "+blist.getNumberOfBlocks() +" blocks");
}
项目:RDFS    文件:NameNode.java   
protected Collection<Block> blockReportWithRetries(
    DatanodeRegistration nodeReg, BlockReport blocks) throws IOException {
  verifyRequest(nodeReg);
  myMetrics.numBlockReport.inc();
  BlockListAsLongs blist =
    new BlockListAsLongs(blocks.getBlockReportInLongs());
  stateChangeLog.debug("*BLOCK* NameNode.blockReport: " + "from "
      + nodeReg.getName() + " " + blist.getNumberOfBlocks() + " blocks");

  return namesystem.processReport(nodeReg, blist);
}
项目:RDFS    文件:DataNode.java   
/**
 * Sends a 'Blocks Being Written' report to the given node.
 *
 * @param node the node to send the report to
 * @throws IOException
 */
public void sendBlocksBeingWrittenReport(DatanodeProtocol node,
    int namespaceId, DatanodeRegistration nsRegistration) throws IOException {
  Block[] blocks = data.getBlocksBeingWrittenReport(namespaceId);
  if (blocks != null && blocks.length != 0) {
    long[] blocksAsLong =
      BlockListAsLongs.convertToArrayLongs(blocks);
    BlockReport bbwReport = new BlockReport(blocksAsLong);
    node.blocksBeingWrittenReport(nsRegistration, bbwReport);
  }
}
项目:hadoop-EAR    文件:NameNode.java   
@Override
public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
    BlockReport blocks) throws IOException {
  return blockReport(nodeReg, blocks.getBlockReportInLongs());
}
项目:hops    文件:BlockManager.java   
/**
 * The given datanode is reporting all its blocks.
 * Update the (machine-->blocklist) and (block-->machinelist) maps.
 */
public void processReport(final DatanodeID nodeID, final String poolId,
    final BlockReport newReport) throws IOException {
  final long startTime = Time.now(); //after acquiring write lock
  final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
  if (node == null || !node.isAlive) {
    throw new IOException(
        "ProcessReport from dead or unregistered node: " + nodeID);
  }

  // To minimize startup time, we discard any second (or later) block reports
  // that we receive while still in startup phase.
  if (namesystem.isInStartupSafeMode() && !node.isFirstBlockReport()) {
    blockLog.info("BLOCK* processReport: " +
        "discarded non-initial block report from " + nodeID +
        " because namenode still in startup phase");
    return;
  }

  ReportStatistics reportStatistics = processReport(node, newReport);

  // Now that we have an up-to-date block report, we know that any
  // deletions from a previous NN iteration have been accounted for.
  boolean staleBefore = node.areBlockContentsStale();
  node.receivedBlockReport();
  if (staleBefore && !node.areBlockContentsStale()) {
    LOG.info(
        "BLOCK* processReport: Received first block report from " + node +
            " after becoming active. Its block contents are no longer" +
            " considered stale");
    rescanPostponedMisreplicatedBlocks();
  }

  final long endTime = Time.now();

  // Log the block report processing stats from Namenode perspective
  final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
  if (metrics != null) {
    metrics.addBlockReport((int) (endTime - startTime));
  }
  blockLog.info("BLOCK* processReport: from " + nodeID + ", blocks: " +
      newReport.getNumBlocks() + ", processing time: " +
      (endTime - startTime) + " ms. " + reportStatistics);
}
项目:hops    文件:BPOfferService.java   
/**
 * Report the list blocks to the Namenode
 *
 * @throws IOException
 */
DatanodeCommand blockReport() throws IOException {
  // send block report if timer has expired.
  DatanodeCommand cmd = null;
  long startTime = now();
  if (startTime - lastBlockReport > dnConf.blockReportInterval) {

    // Flush any block information that precedes the block report. Otherwise
    // we have a chance that we will miss the delHint information
    // or we will report an RBW replica after the BlockReport already reports
    // a FINALIZED one.
    reportReceivedDeletedBlocks();

    // Create block report
    long brCreateStartTime = now();
    BlockReport bReport =
        dn.getFSDataset().getBlockReport(getBlockPoolId());

    // Send block report
    long brSendStartTime = now();
    StorageBlockReport[] report = {new StorageBlockReport(
        new DatanodeStorage(bpRegistration.getStorageID()),
        bReport)};

    ActiveNode an = nextNNForBlkReport(bReport.getNumBlocks());
    if (an != null) {
      blkReportHander = getAnActor(an.getRpcServerAddressForDatanodes());
      if (blkReportHander == null || !blkReportHander.isInitialized()) {
        return null; //no one is ready to handle the request, return now without changing the values of lastBlockReport. it will be retried in next cycle
      }
    } else {
      LOG.warn("Unable to send block report. Current namenodes are: "+ Arrays.toString(nnList.toArray()));
      return null;
    }

    cmd =
        blkReportHander.blockReport(bpRegistration, getBlockPoolId(), report);

    // Log the block report processing stats from Datanode perspective
    long brSendCost = now() - brSendStartTime;
    long brCreateCost = brSendStartTime - brCreateStartTime;
    dn.getMetrics().addBlockReport(brSendCost);
    LOG.info(
        "BlockReport of " + bReport.getNumBlocks() + " blocks took " +
            brCreateCost + " msec to generate and " + brSendCost +
            " msecs for RPC and NN processing");

    // If we have sent the first block report, then wait a random
    // time before we start the periodic block reports.
    if (resetBlockReportTime) {
      lastBlockReport = startTime -
          DFSUtil.getRandom().nextInt((int) (dnConf.blockReportInterval));
      resetBlockReportTime = false;
    } else {
      /* say the last block report was at 8:20:14. The current report
       * should have started around 9:20:14 (default 1 hour interval).
       * If current time is :
       *   1) normal like 9:20:18, next report should be at 10:20:14
       *   2) unexpected like 11:35:43, next report should be at 12:20:14
       */
      lastBlockReport +=
          (now() - lastBlockReport) / dnConf.blockReportInterval *
              dnConf.blockReportInterval;
    }
    LOG.info("sent block report, processed command:" + cmd);
  }
  return cmd;
}
项目:hops    文件:NNThroughputBenchmark.java   
BlockReport getBlockReportList() {
  return blockReportList;
}
项目:hops    文件:TestBlockReport.java   
/**
 * Test write a file, verifies and closes it. Then a couple of random blocks
 * is removed and BlockReport is forced; the FSNamesystem is pushed to
 * recalculate required DN's activities such as replications and so on.
 * The number of missing and under-replicated blocks should be the same in
 * case of a single-DN cluster.
 *
 * @throws IOException
 *     in case of errors
 */
@Test
public void blockReport_02() throws IOException {
  final String METHOD_NAME = GenericTestUtils.getMethodName();
  LOG.info("Running test " + METHOD_NAME);

  Path filePath = new Path("/" + METHOD_NAME + ".dat");
  DFSTestUtil
      .createFile(fs, filePath, FILE_SIZE, REPL_FACTOR, rand.nextLong());

  // mock around with newly created blocks and delete some
  File dataDir = new File(cluster.getDataDirectory());
  assertTrue(dataDir.isDirectory());

  List<ExtendedBlock> blocks2Remove = new ArrayList<>();
  List<Integer> removedIndex = new ArrayList<>();
  List<LocatedBlock> lBlocks = cluster.getNameNodeRpc()
      .getBlockLocations(filePath.toString(), FILE_START, FILE_SIZE)
      .getLocatedBlocks();

  while (removedIndex.size() != 2) {
    int newRemoveIndex = rand.nextInt(lBlocks.size());
    if (!removedIndex.contains(newRemoveIndex)) {
      removedIndex.add(newRemoveIndex);
    }
  }

  for (Integer aRemovedIndex : removedIndex) {
    blocks2Remove.add(lBlocks.get(aRemovedIndex).getBlock());
  }
  ArrayList<Block> blocks = locatedToBlocks(lBlocks, removedIndex);

  if (LOG.isDebugEnabled()) {
    LOG.debug("Number of blocks allocated " + lBlocks.size());
  }

  final DataNode dn0 = cluster.getDataNodes().get(DN_N0);
  for (ExtendedBlock b : blocks2Remove) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Removing the block " + b.getBlockName());
    }
    for (File f : findAllFiles(dataDir,
        new MyFileFilter(b.getBlockName(), true))) {
      DataNodeTestUtils.getFSDataset(dn0).unfinalizeBlock(b);
      if (!f.delete()) {
        LOG.warn("Couldn't delete " + b.getBlockName());
      }
    }
  }

  waitTil(DN_RESCAN_EXTRA_WAIT);

  // all blocks belong to the same file, hence same BP
  String poolId = cluster.getNamesystem().getBlockPoolId();
  DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
  StorageBlockReport[] report =
      {new StorageBlockReport(new DatanodeStorage(dnR.getStorageID()),
          BlockReport.builder(NUM_BUCKETS).addAllAsFinalized(blocks).build
              ())};
  cluster.getNameNodeRpc().blockReport(dnR, poolId, report);

  BlockManagerTestUtil
      .getComputedDatanodeWork(cluster.getNamesystem().getBlockManager());

  printStats();

  assertEquals("Wrong number of MissingBlocks is found", blocks2Remove.size(),
      cluster.getNamesystem().getMissingBlocksCount());
  assertEquals("Wrong number of UnderReplicatedBlocks is found",
      blocks2Remove.size(),
      cluster.getNamesystem().getUnderReplicatedBlocks());
}
项目:hops    文件:TestBlockReport.java   
/**
 * The test set the configuration parameters for a large block size and
 * restarts initiated single-node cluster.
 * Then it writes a file > block_size and closes it.
 * The second datanode is started in the cluster.
 * As soon as the replication process is started and at least one TEMPORARY
 * replica is found test forces BlockReport process and checks
 * if the TEMPORARY replica isn't reported on it.
 * Eventually, the configuration is being restored into the original state.
 *
 * @throws IOException
 *     in case of an error
 */
@Test
public void blockReport_08() throws IOException, InterruptedException {
  final String METHOD_NAME = GenericTestUtils.getMethodName();
  Path filePath = new Path("/" + METHOD_NAME + ".dat");
  final int DN_N1 = DN_N0 + 1;
  final int bytesChkSum = 1024 * 1000;

  conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytesChkSum);
  conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 6 * bytesChkSum);
  shutDownCluster();
  startUpCluster();

  try {
    ArrayList<Block> blocks =
        writeFile(METHOD_NAME, 12 * bytesChkSum, filePath);
    Block bl = findBlock(filePath, 12 * bytesChkSum);
    BlockChecker bc = new BlockChecker(filePath);
    bc.start();

    waitForTempReplica(bl, DN_N1);

    // all blocks belong to the same file, hence same BP
    DataNode dn = cluster.getDataNodes().get(DN_N1);
    String poolId = cluster.getNamesystem().getBlockPoolId();
    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
    StorageBlockReport[] report =
        {new StorageBlockReport(new DatanodeStorage(dnR.getStorageID()),
            BlockReport.builder(NUM_BUCKETS).addAllAsFinalized(blocks).build())};
    cluster.getNameNodeRpc().blockReport(dnR, poolId, report);

    assertEquals("Wrong number of PendingReplication blocks", blocks.size(),
        cluster.getNamesystem().getPendingReplicationBlocks());
    printStats();

    try {
      bc.join();
    } catch (InterruptedException e) {
    }
  } finally {
    resetConfiguration(); // return the initial state of the configuration
  }
}
项目:hops    文件:TestBlockReport.java   
@Test
public void blockReport_09() throws IOException {
  final String METHOD_NAME = GenericTestUtils.getMethodName();
  Path filePath = new Path("/" + METHOD_NAME + ".dat");
  final int DN_N1 = DN_N0 + 1;
  final int bytesChkSum = 1024 * 1000;

  conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytesChkSum);
  conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 6 * bytesChkSum);
  shutDownCluster();
  startUpCluster();
  // write file and start second node to be "older" than the original

  try {
    ArrayList<Block> blocks =
        writeFile(METHOD_NAME, 12 * bytesChkSum, filePath);

    Block bl = findBlock(filePath, 12 * bytesChkSum);
    BlockChecker bc = new BlockChecker(filePath);
    bc.start();
    corruptBlockGS(bl);
    corruptBlockLen(bl);

    waitForTempReplica(bl, DN_N1);

    // all blocks belong to the same file, hence same BP
    DataNode dn = cluster.getDataNodes().get(DN_N1);
    String poolId = cluster.getNamesystem().getBlockPoolId();
    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
    StorageBlockReport[] report =
        {new StorageBlockReport(new DatanodeStorage(dnR.getStorageID()),
            BlockReport.builder(NUM_BUCKETS).addAllAsFinalized(blocks).build())};
    cluster.getNameNodeRpc().blockReport(dnR, poolId, report);

    assertEquals("Wrong number of PendingReplication blocks", 2,
        cluster.getNamesystem().getPendingReplicationBlocks());
    printStats();
    try {
      bc.join();
    } catch (InterruptedException e) {
    }
  } finally {
    resetConfiguration(); // return the initial state of the configuration
  }
}
项目:RDFS    文件:NameNode.java   
@Override
public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
    BlockReport blocks) throws IOException {
  return blockReport(nodeReg, blocks.getBlockReportInLongs());
}
项目:hadoop-EAR    文件:DatanodeProtocols.java   
/**
 * This method should not be invoked on the composite 
 * DatanodeProtocols object. You can call these on the individual
 * DatanodeProcol objects.
 */
public void blocksBeingWrittenReport(DatanodeRegistration registration,
                                   BlockReport blocks) throws IOException {
  throw new IOException("blockReport" + errMessage);
}
项目:hadoop-EAR    文件:DatanodeProtocols.java   
/**
 * This method should not be invoked on the composite 
 * DatanodeProtocols object. You can call these on the individual
 * DatanodeProcol objects.
 */
public DatanodeCommand blockReport(DatanodeRegistration registration,
                                   BlockReport blocks) throws IOException {
  throw new IOException("blockReport" + errMessage);
}
项目:hops    文件:FsDatasetSpi.java   
/**
 * Returns the block report - the full list of blocks stored under a
 * block pool
 *
 * @param bpid
 *     Block Pool Id
 * @return - the block report - the full list of blocks stored
 */
public BlockReport getBlockReport(String bpid);
项目:RDFS    文件:DatanodeProtocols.java   
/**
 * This method should not be invoked on the composite 
 * DatanodeProtocols object. You can call these on the individual
 * DatanodeProcol objects.
 */
public void blocksBeingWrittenReport(DatanodeRegistration registration,
                                   BlockReport blocks) throws IOException {
  throw new IOException("blockReport" + errMessage);
}
项目:RDFS    文件:DatanodeProtocols.java   
/**
 * This method should not be invoked on the composite 
 * DatanodeProtocols object. You can call these on the individual
 * DatanodeProcol objects.
 */
public DatanodeCommand blockReport(DatanodeRegistration registration,
                                   BlockReport blocks) throws IOException {
  throw new IOException("blockReport" + errMessage);
}
项目:hadoop-EAR    文件:AvatarProtocol.java   
public DatanodeCommand blockReportNew(DatanodeRegistration reg, BlockReport rep) throws IOException;
项目:RDFS    文件:AvatarProtocol.java   
public DatanodeCommand blockReportNew(DatanodeRegistration reg, BlockReport rep) throws IOException;