Java 类org.apache.hadoop.hdfs.client.BlockReportOptions 实例源码

项目:aliyun-oss-hadoop-fs    文件:TestDatanodeRegistration.java   
private boolean waitForBlockReport(final DataNode dn,
    final DatanodeDescriptor dnd) throws Exception {
  final DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
  final long lastCount = storage.getBlockReportCount();
  dn.triggerBlockReport(
      new BlockReportOptions.Factory().setIncremental(false).build());
  try {
    GenericTestUtils.waitFor(new Supplier<Boolean>() {
      @Override
      public Boolean get() {
        return lastCount != storage.getBlockReportCount();
      }
    }, 10, 100);
  } catch (TimeoutException te) {
    return false;
  }
  return true;
}
项目:hadoop    文件:BPServiceActor.java   
void triggerBlockReport(BlockReportOptions options) throws IOException {
  if (options.isIncremental()) {
    LOG.info(bpos.toString() + ": scheduling an incremental block report.");
    synchronized(pendingIncrementalBRperStorage) {
      sendImmediateIBR = true;
      pendingIncrementalBRperStorage.notifyAll();
    }
  } else {
    LOG.info(bpos.toString() + ": scheduling a full block report.");
    synchronized(pendingIncrementalBRperStorage) {
      scheduler.scheduleBlockReport(0);
      pendingIncrementalBRperStorage.notifyAll();
    }
  }
}
项目:hadoop    文件:DataNode.java   
@Override // ClientDatanodeProtocol
public void triggerBlockReport(BlockReportOptions options)
    throws IOException {
  checkSuperuserPrivilege();
  for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
    if (bpos != null) {
      for (BPServiceActor actor : bpos.getBPServiceActors()) {
        actor.triggerBlockReport(options);
      }
    }
  }
}
项目:hadoop    文件:ClientDatanodeProtocolTranslatorPB.java   
@Override
public void triggerBlockReport(BlockReportOptions options)
    throws IOException {
  try {
    rpcProxy.triggerBlockReport(NULL_CONTROLLER,
        TriggerBlockReportRequestProto.newBuilder().
          setIncremental(options.isIncremental()).
          build());
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
}
项目:hadoop    文件:ClientDatanodeProtocolServerSideTranslatorPB.java   
@Override
public TriggerBlockReportResponseProto triggerBlockReport(
    RpcController unused, TriggerBlockReportRequestProto request)
        throws ServiceException {
  try {
    impl.triggerBlockReport(new BlockReportOptions.Factory().
        setIncremental(request.getIncremental()).build());
  } catch (IOException e) {
    throw new ServiceException(e);
  }
  return TRIGGER_BLOCK_REPORT_RESP;
}
项目:hadoop    文件:DFSAdmin.java   
public int triggerBlockReport(String[] argv) throws IOException {
  List<String> args = new LinkedList<String>();
  for (int j = 1; j < argv.length; j++) {
    args.add(argv[j]);
  }
  boolean incremental = StringUtils.popOption("-incremental", args);
  String hostPort = StringUtils.popFirstNonOption(args);
  if (hostPort == null) {
    System.err.println("You must specify a host:port pair.");
    return 1;
  }
  if (!args.isEmpty()) {
    System.err.print("Can't understand arguments: " +
      Joiner.on(" ").join(args) + "\n");
    return 1;
  }
  ClientDatanodeProtocol dnProxy = getDataNodeProxy(hostPort);
  try {
    dnProxy.triggerBlockReport(
        new BlockReportOptions.Factory().
            setIncremental(incremental).
            build());
  } catch (IOException e) {
    System.err.println("triggerBlockReport error: " + e);
    return 1;
  }
  System.out.println("Triggering " +
      (incremental ? "an incremental " : "a full ") +
      "block report on " + hostPort + ".");
  return 0;
}
项目:aliyun-oss-hadoop-fs    文件:ClientDatanodeProtocolTranslatorPB.java   
@Override
public void triggerBlockReport(BlockReportOptions options)
    throws IOException {
  try {
    rpcProxy.triggerBlockReport(NULL_CONTROLLER,
        TriggerBlockReportRequestProto.newBuilder().
            setIncremental(options.isIncremental()).
            build());
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
}
项目:aliyun-oss-hadoop-fs    文件:BPServiceActor.java   
void triggerBlockReport(BlockReportOptions options) throws IOException {
  if (options.isIncremental()) {
    LOG.info(bpos.toString() + ": scheduling an incremental block report.");
    synchronized(pendingIncrementalBRperStorage) {
      sendImmediateIBR = true;
      pendingIncrementalBRperStorage.notifyAll();
    }
  } else {
    LOG.info(bpos.toString() + ": scheduling a full block report.");
    synchronized(pendingIncrementalBRperStorage) {
      scheduler.forceFullBlockReportNow();
      pendingIncrementalBRperStorage.notifyAll();
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:DataNode.java   
@Override // ClientDatanodeProtocol
public void triggerBlockReport(BlockReportOptions options)
    throws IOException {
  checkSuperuserPrivilege();
  for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
    if (bpos != null) {
      for (BPServiceActor actor : bpos.getBPServiceActors()) {
        actor.triggerBlockReport(options);
      }
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:ClientDatanodeProtocolServerSideTranslatorPB.java   
@Override
public TriggerBlockReportResponseProto triggerBlockReport(
    RpcController unused, TriggerBlockReportRequestProto request)
        throws ServiceException {
  try {
    impl.triggerBlockReport(new BlockReportOptions.Factory().
        setIncremental(request.getIncremental()).build());
  } catch (IOException e) {
    throw new ServiceException(e);
  }
  return TRIGGER_BLOCK_REPORT_RESP;
}
项目:aliyun-oss-hadoop-fs    文件:DFSAdmin.java   
public int triggerBlockReport(String[] argv) throws IOException {
  List<String> args = new LinkedList<String>();
  for (int j = 1; j < argv.length; j++) {
    args.add(argv[j]);
  }
  boolean incremental = StringUtils.popOption("-incremental", args);
  String hostPort = StringUtils.popFirstNonOption(args);
  if (hostPort == null) {
    System.err.println("You must specify a host:port pair.");
    return 1;
  }
  if (!args.isEmpty()) {
    System.err.print("Can't understand arguments: " +
      Joiner.on(" ").join(args) + "\n");
    return 1;
  }
  ClientDatanodeProtocol dnProxy = getDataNodeProxy(hostPort);
  try {
    dnProxy.triggerBlockReport(
        new BlockReportOptions.Factory().
            setIncremental(incremental).
            build());
  } catch (IOException e) {
    System.err.println("triggerBlockReport error: " + e);
    return 1;
  }
  System.out.println("Triggering " +
      (incremental ? "an incremental " : "a full ") +
      "block report on " + hostPort + ".");
  return 0;
}
项目:big-c    文件:BPServiceActor.java   
void triggerBlockReport(BlockReportOptions options) throws IOException {
  if (options.isIncremental()) {
    LOG.info(bpos.toString() + ": scheduling an incremental block report.");
    synchronized(pendingIncrementalBRperStorage) {
      sendImmediateIBR = true;
      pendingIncrementalBRperStorage.notifyAll();
    }
  } else {
    LOG.info(bpos.toString() + ": scheduling a full block report.");
    synchronized(pendingIncrementalBRperStorage) {
      scheduler.scheduleBlockReport(0);
      pendingIncrementalBRperStorage.notifyAll();
    }
  }
}
项目:big-c    文件:DataNode.java   
@Override // ClientDatanodeProtocol
public void triggerBlockReport(BlockReportOptions options)
    throws IOException {
  checkSuperuserPrivilege();
  for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
    if (bpos != null) {
      for (BPServiceActor actor : bpos.getBPServiceActors()) {
        actor.triggerBlockReport(options);
      }
    }
  }
}
项目:big-c    文件:ClientDatanodeProtocolTranslatorPB.java   
@Override
public void triggerBlockReport(BlockReportOptions options)
    throws IOException {
  try {
    rpcProxy.triggerBlockReport(NULL_CONTROLLER,
        TriggerBlockReportRequestProto.newBuilder().
          setIncremental(options.isIncremental()).
          build());
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
}
项目:big-c    文件:ClientDatanodeProtocolServerSideTranslatorPB.java   
@Override
public TriggerBlockReportResponseProto triggerBlockReport(
    RpcController unused, TriggerBlockReportRequestProto request)
        throws ServiceException {
  try {
    impl.triggerBlockReport(new BlockReportOptions.Factory().
        setIncremental(request.getIncremental()).build());
  } catch (IOException e) {
    throw new ServiceException(e);
  }
  return TRIGGER_BLOCK_REPORT_RESP;
}
项目:big-c    文件:DFSAdmin.java   
public int triggerBlockReport(String[] argv) throws IOException {
  List<String> args = new LinkedList<String>();
  for (int j = 1; j < argv.length; j++) {
    args.add(argv[j]);
  }
  boolean incremental = StringUtils.popOption("-incremental", args);
  String hostPort = StringUtils.popFirstNonOption(args);
  if (hostPort == null) {
    System.err.println("You must specify a host:port pair.");
    return 1;
  }
  if (!args.isEmpty()) {
    System.err.print("Can't understand arguments: " +
      Joiner.on(" ").join(args) + "\n");
    return 1;
  }
  ClientDatanodeProtocol dnProxy = getDataNodeProxy(hostPort);
  try {
    dnProxy.triggerBlockReport(
        new BlockReportOptions.Factory().
            setIncremental(incremental).
            build());
  } catch (IOException e) {
    System.err.println("triggerBlockReport error: " + e);
    return 1;
  }
  System.out.println("Triggering " +
      (incremental ? "an incremental " : "a full ") +
      "block report on " + hostPort + ".");
  return 0;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:BPServiceActor.java   
void triggerBlockReport(BlockReportOptions options) throws IOException {
  if (options.isIncremental()) {
    LOG.info(bpos.toString() + ": scheduling an incremental block report.");
    synchronized(pendingIncrementalBRperStorage) {
      sendImmediateIBR = true;
      pendingIncrementalBRperStorage.notifyAll();
    }
  } else {
    LOG.info(bpos.toString() + ": scheduling a full block report.");
    synchronized(pendingIncrementalBRperStorage) {
      lastBlockReport = 0;
      pendingIncrementalBRperStorage.notifyAll();
    }
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DataNode.java   
@Override // ClientDatanodeProtocol
public void triggerBlockReport(BlockReportOptions options)
    throws IOException {
  checkSuperuserPrivilege();
  for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
    if (bpos != null) {
      for (BPServiceActor actor : bpos.getBPServiceActors()) {
        actor.triggerBlockReport(options);
      }
    }
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:ClientDatanodeProtocolTranslatorPB.java   
@Override
public void triggerBlockReport(BlockReportOptions options)
    throws IOException {
  try {
    rpcProxy.triggerBlockReport(NULL_CONTROLLER,
        TriggerBlockReportRequestProto.newBuilder().
          setIncremental(options.isIncremental()).
          build());
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:ClientDatanodeProtocolServerSideTranslatorPB.java   
@Override
public TriggerBlockReportResponseProto triggerBlockReport(
    RpcController unused, TriggerBlockReportRequestProto request)
        throws ServiceException {
  try {
    impl.triggerBlockReport(new BlockReportOptions.Factory().
        setIncremental(request.getIncremental()).build());
  } catch (IOException e) {
    throw new ServiceException(e);
  }
  return TRIGGER_BLOCK_REPORT_RESP;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DFSAdmin.java   
public int triggerBlockReport(String[] argv) throws IOException {
  List<String> args = new LinkedList<String>();
  for (int j = 1; j < argv.length; j++) {
    args.add(argv[j]);
  }
  boolean incremental = StringUtils.popOption("-incremental", args);
  String hostPort = StringUtils.popFirstNonOption(args);
  if (hostPort == null) {
    System.err.println("You must specify a host:port pair.");
    return 1;
  }
  if (!args.isEmpty()) {
    System.err.print("Can't understand arguments: " +
      Joiner.on(" ").join(args) + "\n");
    return 1;
  }
  ClientDatanodeProtocol dnProxy = getDataNodeProxy(hostPort);
  try {
    dnProxy.triggerBlockReport(
        new BlockReportOptions.Factory().
            setIncremental(incremental).
            build());
  } catch (IOException e) {
    System.err.println("triggerBlockReport error: " + e);
    return 1;
  }
  System.out.println("Triggering " +
      (incremental ? "an incremental " : "a full ") +
      "block report on " + hostPort + ".");
  return 0;
}
项目:hadoop    文件:TestTriggerBlockReport.java   
private void testTriggerBlockReport(boolean incremental) throws Exception {
  Configuration conf = new HdfsConfiguration();

  // Set a really long value for dfs.blockreport.intervalMsec and
  // dfs.heartbeat.interval, so that incremental block reports and heartbeats
  // won't be sent during this test unless they're triggered
  // manually.
  conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10800000L);
  conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L);

  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  cluster.waitActive();
  FileSystem fs = cluster.getFileSystem();
  DatanodeProtocolClientSideTranslatorPB spy =
      DataNodeTestUtils.spyOnBposToNN(
          cluster.getDataNodes().get(0), cluster.getNameNode());
  DFSTestUtil.createFile(fs, new Path("/abc"), 16, (short) 1, 1L);

  // We should get 1 incremental block report.
  Mockito.verify(spy, timeout(60000).times(1)).blockReceivedAndDeleted(
      any(DatanodeRegistration.class),
      anyString(),
      any(StorageReceivedDeletedBlocks[].class));

  // We should not receive any more incremental or incremental block reports,
  // since the interval we configured is so long.
  for (int i = 0; i < 3; i++) {
    Thread.sleep(10);
    Mockito.verify(spy, times(0)).blockReport(
        any(DatanodeRegistration.class),
        anyString(),
        any(StorageBlockReport[].class),
        Mockito.<BlockReportContext>anyObject());
    Mockito.verify(spy, times(1)).blockReceivedAndDeleted(
        any(DatanodeRegistration.class),
        anyString(),
        any(StorageReceivedDeletedBlocks[].class));
  }

  // Create a fake block deletion notification on the DataNode.
  // This will be sent with the next incremental block report.
  ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
      new Block(5678, 512, 1000),  BlockStatus.DELETED_BLOCK, null);
  DataNode datanode = cluster.getDataNodes().get(0);
  BPServiceActor actor =
      datanode.getAllBpOs()[0].getBPServiceActors().get(0);
  String storageUuid =
      datanode.getFSDataset().getVolumes().get(0).getStorageID();
  actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);

  // Manually trigger a block report.
  datanode.triggerBlockReport(
      new BlockReportOptions.Factory().
          setIncremental(incremental).
          build()
  );

  // triggerBlockReport returns before the block report is
  // actually sent.  Wait for it to be sent here.
  if (incremental) {
    Mockito.verify(spy, timeout(60000).times(2)).
        blockReceivedAndDeleted(
            any(DatanodeRegistration.class),
            anyString(),
            any(StorageReceivedDeletedBlocks[].class));
  } else {
    Mockito.verify(spy, timeout(60000)).blockReport(
        any(DatanodeRegistration.class),
        anyString(),
        any(StorageBlockReport[].class),
        Mockito.<BlockReportContext>anyObject());
  }

  cluster.shutdown();
}
项目:aliyun-oss-hadoop-fs    文件:TestTriggerBlockReport.java   
private void testTriggerBlockReport(boolean incremental) throws Exception {
  Configuration conf = new HdfsConfiguration();

  // Set a really long value for dfs.blockreport.intervalMsec and
  // dfs.heartbeat.interval, so that incremental block reports and heartbeats
  // won't be sent during this test unless they're triggered
  // manually.
  conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10800000L);
  conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L);

  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  cluster.waitActive();
  FileSystem fs = cluster.getFileSystem();
  DatanodeProtocolClientSideTranslatorPB spy =
      DataNodeTestUtils.spyOnBposToNN(
          cluster.getDataNodes().get(0), cluster.getNameNode());
  DFSTestUtil.createFile(fs, new Path("/abc"), 16, (short) 1, 1L);

  // We should get 1 incremental block report.
  Mockito.verify(spy, timeout(60000).times(1)).blockReceivedAndDeleted(
      any(DatanodeRegistration.class),
      anyString(),
      any(StorageReceivedDeletedBlocks[].class));

  // We should not receive any more incremental or incremental block reports,
  // since the interval we configured is so long.
  for (int i = 0; i < 3; i++) {
    Thread.sleep(10);
    Mockito.verify(spy, times(0)).blockReport(
        any(DatanodeRegistration.class),
        anyString(),
        any(StorageBlockReport[].class),
        Mockito.<BlockReportContext>anyObject());
    Mockito.verify(spy, times(1)).blockReceivedAndDeleted(
        any(DatanodeRegistration.class),
        anyString(),
        any(StorageReceivedDeletedBlocks[].class));
  }

  // Create a fake block deletion notification on the DataNode.
  // This will be sent with the next incremental block report.
  ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
      new Block(5678, 512, 1000),  BlockStatus.DELETED_BLOCK, null);
  DataNode datanode = cluster.getDataNodes().get(0);
  BPServiceActor actor =
      datanode.getAllBpOs().get(0).getBPServiceActors().get(0);
  String storageUuid;
  try (FsDatasetSpi.FsVolumeReferences volumes =
      datanode.getFSDataset().getFsVolumeReferences()) {
    storageUuid = volumes.get(0).getStorageID();
  }
  actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);

  // Manually trigger a block report.
  datanode.triggerBlockReport(
      new BlockReportOptions.Factory().
          setIncremental(incremental).
          build()
  );

  // triggerBlockReport returns before the block report is
  // actually sent.  Wait for it to be sent here.
  if (incremental) {
    Mockito.verify(spy, timeout(60000).times(2)).
        blockReceivedAndDeleted(
            any(DatanodeRegistration.class),
            anyString(),
            any(StorageReceivedDeletedBlocks[].class));
  } else {
    Mockito.verify(spy, timeout(60000)).blockReport(
        any(DatanodeRegistration.class),
        anyString(),
        any(StorageBlockReport[].class),
        Mockito.<BlockReportContext>anyObject());
  }

  cluster.shutdown();
}
项目:big-c    文件:TestTriggerBlockReport.java   
private void testTriggerBlockReport(boolean incremental) throws Exception {
  Configuration conf = new HdfsConfiguration();

  // Set a really long value for dfs.blockreport.intervalMsec and
  // dfs.heartbeat.interval, so that incremental block reports and heartbeats
  // won't be sent during this test unless they're triggered
  // manually.
  conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10800000L);
  conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L);

  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  cluster.waitActive();
  FileSystem fs = cluster.getFileSystem();
  DatanodeProtocolClientSideTranslatorPB spy =
      DataNodeTestUtils.spyOnBposToNN(
          cluster.getDataNodes().get(0), cluster.getNameNode());
  DFSTestUtil.createFile(fs, new Path("/abc"), 16, (short) 1, 1L);

  // We should get 1 incremental block report.
  Mockito.verify(spy, timeout(60000).times(1)).blockReceivedAndDeleted(
      any(DatanodeRegistration.class),
      anyString(),
      any(StorageReceivedDeletedBlocks[].class));

  // We should not receive any more incremental or incremental block reports,
  // since the interval we configured is so long.
  for (int i = 0; i < 3; i++) {
    Thread.sleep(10);
    Mockito.verify(spy, times(0)).blockReport(
        any(DatanodeRegistration.class),
        anyString(),
        any(StorageBlockReport[].class),
        Mockito.<BlockReportContext>anyObject());
    Mockito.verify(spy, times(1)).blockReceivedAndDeleted(
        any(DatanodeRegistration.class),
        anyString(),
        any(StorageReceivedDeletedBlocks[].class));
  }

  // Create a fake block deletion notification on the DataNode.
  // This will be sent with the next incremental block report.
  ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
      new Block(5678, 512, 1000),  BlockStatus.DELETED_BLOCK, null);
  DataNode datanode = cluster.getDataNodes().get(0);
  BPServiceActor actor =
      datanode.getAllBpOs()[0].getBPServiceActors().get(0);
  String storageUuid =
      datanode.getFSDataset().getVolumes().get(0).getStorageID();
  actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);

  // Manually trigger a block report.
  datanode.triggerBlockReport(
      new BlockReportOptions.Factory().
          setIncremental(incremental).
          build()
  );

  // triggerBlockReport returns before the block report is
  // actually sent.  Wait for it to be sent here.
  if (incremental) {
    Mockito.verify(spy, timeout(60000).times(2)).
        blockReceivedAndDeleted(
            any(DatanodeRegistration.class),
            anyString(),
            any(StorageReceivedDeletedBlocks[].class));
  } else {
    Mockito.verify(spy, timeout(60000)).blockReport(
        any(DatanodeRegistration.class),
        anyString(),
        any(StorageBlockReport[].class),
        Mockito.<BlockReportContext>anyObject());
  }

  cluster.shutdown();
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestTriggerBlockReport.java   
private void testTriggerBlockReport(boolean incremental) throws Exception {
  Configuration conf = new HdfsConfiguration();

  // Set a really long value for dfs.blockreport.intervalMsec and
  // dfs.heartbeat.interval, so that incremental block reports and heartbeats
  // won't be sent during this test unless they're triggered
  // manually.
  conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10800000L);
  conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L);

  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  cluster.waitActive();
  FileSystem fs = cluster.getFileSystem();
  DatanodeProtocolClientSideTranslatorPB spy =
      DataNodeTestUtils.spyOnBposToNN(
          cluster.getDataNodes().get(0), cluster.getNameNode());
  DFSTestUtil.createFile(fs, new Path("/abc"), 16, (short) 1, 1L);

  // We should get 1 incremental block report.
  Mockito.verify(spy, timeout(60000).times(1)).blockReceivedAndDeleted(
      any(DatanodeRegistration.class),
      anyString(),
      any(StorageReceivedDeletedBlocks[].class));

  // We should not receive any more incremental or incremental block reports,
  // since the interval we configured is so long.
  for (int i = 0; i < 3; i++) {
    Thread.sleep(10);
    Mockito.verify(spy, times(0)).blockReport(
        any(DatanodeRegistration.class),
        anyString(),
        any(StorageBlockReport[].class),
        Mockito.<BlockReportContext>anyObject());
    Mockito.verify(spy, times(1)).blockReceivedAndDeleted(
        any(DatanodeRegistration.class),
        anyString(),
        any(StorageReceivedDeletedBlocks[].class));
  }

  // Create a fake block deletion notification on the DataNode.
  // This will be sent with the next incremental block report.
  ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
      new Block(5678, 512, 1000),  BlockStatus.DELETED_BLOCK, null);
  DataNode datanode = cluster.getDataNodes().get(0);
  BPServiceActor actor =
      datanode.getAllBpOs()[0].getBPServiceActors().get(0);
  String storageUuid =
      datanode.getFSDataset().getVolumes().get(0).getStorageID();
  actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);

  // Manually trigger a block report.
  datanode.triggerBlockReport(
      new BlockReportOptions.Factory().
          setIncremental(incremental).
          build()
  );

  // triggerBlockReport returns before the block report is
  // actually sent.  Wait for it to be sent here.
  if (incremental) {
    Mockito.verify(spy, timeout(60000).times(2)).
        blockReceivedAndDeleted(
            any(DatanodeRegistration.class),
            anyString(),
            any(StorageReceivedDeletedBlocks[].class));
  } else {
    Mockito.verify(spy, timeout(60000)).blockReport(
        any(DatanodeRegistration.class),
        anyString(),
        any(StorageBlockReport[].class),
        Mockito.<BlockReportContext>anyObject());
  }

  cluster.shutdown();
}
项目:hadoop    文件:ClientDatanodeProtocol.java   
/**
 * Trigger a new block report.
 */
void triggerBlockReport(BlockReportOptions options)
  throws IOException;
项目:aliyun-oss-hadoop-fs    文件:ClientDatanodeProtocol.java   
/**
 * Trigger a new block report.
 */
void triggerBlockReport(BlockReportOptions options)
  throws IOException;
项目:big-c    文件:ClientDatanodeProtocol.java   
/**
 * Trigger a new block report.
 */
void triggerBlockReport(BlockReportOptions options)
  throws IOException;
项目:hadoop-2.6.0-cdh5.4.3    文件:ClientDatanodeProtocol.java   
/**
 * Trigger a new block report.
 */
void triggerBlockReport(BlockReportOptions options)
  throws IOException;