@Override public DatanodeCommand blockReport(DatanodeRegistration registration, String poolId, StorageBlockReport[] reports, BlockReportContext context) throws IOException { BlockReportRequestProto.Builder builder = BlockReportRequestProto .newBuilder().setRegistration(PBHelper.convert(registration)) .setBlockPoolId(poolId); boolean useBlocksBuffer = registration.getNamespaceInfo() .isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS); for (StorageBlockReport r : reports) { StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto .newBuilder().setStorage(PBHelper.convert(r.getStorage())); BlockListAsLongs blocks = r.getBlocks(); if (useBlocksBuffer) { reportBuilder.setNumberOfBlocks(blocks.getNumberOfBlocks()); reportBuilder.addAllBlocksBuffers(blocks.getBlocksBuffers()); } else { for (long value : blocks.getBlockListAsLongs()) { reportBuilder.addBlocks(value); } } builder.addReports(reportBuilder.build()); } builder.setContext(PBHelper.convert(context)); BlockReportResponseProto resp; try { resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build()); } catch (ServiceException se) { throw ProtobufHelper.getRemoteException(se); } return resp.hasCmd() ? PBHelper.convert(resp.getCmd()) : null; }
@Override public DatanodeCommand blockReport(DatanodeRegistration registration, String poolId, StorageBlockReport[] reports, BlockReportContext context) throws IOException { BlockReportRequestProto.Builder builder = BlockReportRequestProto .newBuilder().setRegistration(PBHelper.convert(registration)) .setBlockPoolId(poolId); boolean useBlocksBuffer = registration.getNamespaceInfo() .isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS); for (StorageBlockReport r : reports) { StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto .newBuilder().setStorage(PBHelperClient.convert(r.getStorage())); BlockListAsLongs blocks = r.getBlocks(); if (useBlocksBuffer) { reportBuilder.setNumberOfBlocks(blocks.getNumberOfBlocks()); reportBuilder.addAllBlocksBuffers(blocks.getBlocksBuffers()); } else { for (long value : blocks.getBlockListAsLongs()) { reportBuilder.addBlocks(value); } } builder.addReports(reportBuilder.build()); } builder.setContext(PBHelper.convert(context)); BlockReportResponseProto resp; try { resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build()); } catch (ServiceException se) { throw ProtobufHelper.getRemoteException(se); } return resp.hasCmd() ? PBHelper.convert(resp.getCmd()) : null; }
@Test public void testDatanodeDetect() throws ServiceException, IOException { final AtomicReference<BlockReportRequestProto> request = new AtomicReference<>(); // just capture the outgoing PB DatanodeProtocolPB mockProxy = mock(DatanodeProtocolPB.class); doAnswer(new Answer<BlockReportResponseProto>() { public BlockReportResponseProto answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); request.set((BlockReportRequestProto) args[1]); return BlockReportResponseProto.newBuilder().build(); } }).when(mockProxy).blockReport(any(RpcController.class), any(BlockReportRequestProto.class)); @SuppressWarnings("resource") DatanodeProtocolClientSideTranslatorPB nn = new DatanodeProtocolClientSideTranslatorPB(mockProxy); DatanodeRegistration reg = DFSTestUtil.getLocalDatanodeRegistration(); NamespaceInfo nsInfo = new NamespaceInfo(1, "cluster", "bp", 1); reg.setNamespaceInfo(nsInfo); Replica r = new FinalizedReplica(new Block(1, 2, 3), null, null); BlockListAsLongs bbl = BlockListAsLongs.encode(Collections.singleton(r)); DatanodeStorage storage = new DatanodeStorage("s1"); StorageBlockReport[] sbr = { new StorageBlockReport(storage, bbl) }; // check DN sends new-style BR request.set(null); nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask()); nn.blockReport(reg, "pool", sbr, new BlockReportContext(1, 0, System.nanoTime())); BlockReportRequestProto proto = request.get(); assertNotNull(proto); assertTrue(proto.getReports(0).getBlocksList().isEmpty()); assertFalse(proto.getReports(0).getBlocksBuffersList().isEmpty()); // back up to prior version and check DN sends old-style BR request.set(null); nsInfo.setCapabilities(Capability.UNKNOWN.getMask()); nn.blockReport(reg, "pool", sbr, new BlockReportContext(1, 0, System.nanoTime())); proto = request.get(); assertNotNull(proto); assertFalse(proto.getReports(0).getBlocksList().isEmpty()); assertTrue(proto.getReports(0).getBlocksBuffersList().isEmpty()); }
@Test public void testCapabilitiesInited() { NamespaceInfo nsInfo = new NamespaceInfo(); assertTrue( nsInfo.isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS)); }
@Test public void testDatanodeDetect() throws ServiceException, IOException { final AtomicReference<BlockReportRequestProto> request = new AtomicReference<>(); // just capture the outgoing PB DatanodeProtocolPB mockProxy = mock(DatanodeProtocolPB.class); doAnswer(new Answer<BlockReportResponseProto>() { public BlockReportResponseProto answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); request.set((BlockReportRequestProto) args[1]); return BlockReportResponseProto.newBuilder().build(); } }).when(mockProxy).blockReport(any(RpcController.class), any(BlockReportRequestProto.class)); @SuppressWarnings("resource") DatanodeProtocolClientSideTranslatorPB nn = new DatanodeProtocolClientSideTranslatorPB(mockProxy); DatanodeRegistration reg = DFSTestUtil.getLocalDatanodeRegistration(); NamespaceInfo nsInfo = new NamespaceInfo(1, "cluster", "bp", 1); reg.setNamespaceInfo(nsInfo); Replica r = new FinalizedReplica(new Block(1, 2, 3), null, null); BlockListAsLongs bbl = BlockListAsLongs.encode(Collections.singleton(r)); DatanodeStorage storage = new DatanodeStorage("s1"); StorageBlockReport[] sbr = { new StorageBlockReport(storage, bbl) }; // check DN sends new-style BR request.set(null); nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask()); nn.blockReport(reg, "pool", sbr, new BlockReportContext(1, 0, System.nanoTime(), 0L)); BlockReportRequestProto proto = request.get(); assertNotNull(proto); assertTrue(proto.getReports(0).getBlocksList().isEmpty()); assertFalse(proto.getReports(0).getBlocksBuffersList().isEmpty()); // back up to prior version and check DN sends old-style BR request.set(null); nsInfo.setCapabilities(Capability.UNKNOWN.getMask()); nn.blockReport(reg, "pool", sbr, new BlockReportContext(1, 0, System.nanoTime(), 0L)); proto = request.get(); assertNotNull(proto); assertFalse(proto.getReports(0).getBlocksList().isEmpty()); assertTrue(proto.getReports(0).getBlocksBuffersList().isEmpty()); }