@Test public void testDatanodeDetect() throws ServiceException, IOException { final AtomicReference<BlockReportRequestProto> request = new AtomicReference<>(); // just capture the outgoing PB DatanodeProtocolPB mockProxy = mock(DatanodeProtocolPB.class); doAnswer(new Answer<BlockReportResponseProto>() { public BlockReportResponseProto answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); request.set((BlockReportRequestProto) args[1]); return BlockReportResponseProto.newBuilder().build(); } }).when(mockProxy).blockReport(any(RpcController.class), any(BlockReportRequestProto.class)); @SuppressWarnings("resource") DatanodeProtocolClientSideTranslatorPB nn = new DatanodeProtocolClientSideTranslatorPB(mockProxy); DatanodeRegistration reg = DFSTestUtil.getLocalDatanodeRegistration(); NamespaceInfo nsInfo = new NamespaceInfo(1, "cluster", "bp", 1); reg.setNamespaceInfo(nsInfo); Replica r = new FinalizedReplica(new Block(1, 2, 3), null, null); BlockListAsLongs bbl = BlockListAsLongs.encode(Collections.singleton(r)); DatanodeStorage storage = new DatanodeStorage("s1"); StorageBlockReport[] sbr = { new StorageBlockReport(storage, bbl) }; // check DN sends new-style BR request.set(null); nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask()); nn.blockReport(reg, "pool", sbr, new BlockReportContext(1, 0, System.nanoTime())); BlockReportRequestProto proto = request.get(); assertNotNull(proto); assertTrue(proto.getReports(0).getBlocksList().isEmpty()); assertFalse(proto.getReports(0).getBlocksBuffersList().isEmpty()); // back up to prior version and check DN sends old-style BR request.set(null); nsInfo.setCapabilities(Capability.UNKNOWN.getMask()); nn.blockReport(reg, "pool", sbr, new BlockReportContext(1, 0, System.nanoTime())); proto = request.get(); assertNotNull(proto); assertFalse(proto.getReports(0).getBlocksList().isEmpty()); assertTrue(proto.getReports(0).getBlocksBuffersList().isEmpty()); }
@Test public void testDatanodeDetect() throws ServiceException, IOException { final AtomicReference<BlockReportRequestProto> request = new AtomicReference<>(); // just capture the outgoing PB DatanodeProtocolPB mockProxy = mock(DatanodeProtocolPB.class); doAnswer(new Answer<BlockReportResponseProto>() { public BlockReportResponseProto answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); request.set((BlockReportRequestProto) args[1]); return BlockReportResponseProto.newBuilder().build(); } }).when(mockProxy).blockReport(any(RpcController.class), any(BlockReportRequestProto.class)); @SuppressWarnings("resource") DatanodeProtocolClientSideTranslatorPB nn = new DatanodeProtocolClientSideTranslatorPB(mockProxy); DatanodeRegistration reg = DFSTestUtil.getLocalDatanodeRegistration(); NamespaceInfo nsInfo = new NamespaceInfo(1, "cluster", "bp", 1); reg.setNamespaceInfo(nsInfo); Replica r = new FinalizedReplica(new Block(1, 2, 3), null, null); BlockListAsLongs bbl = BlockListAsLongs.encode(Collections.singleton(r)); DatanodeStorage storage = new DatanodeStorage("s1"); StorageBlockReport[] sbr = { new StorageBlockReport(storage, bbl) }; // check DN sends new-style BR request.set(null); nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask()); nn.blockReport(reg, "pool", sbr, new BlockReportContext(1, 0, System.nanoTime(), 0L)); BlockReportRequestProto proto = request.get(); assertNotNull(proto); assertTrue(proto.getReports(0).getBlocksList().isEmpty()); assertFalse(proto.getReports(0).getBlocksBuffersList().isEmpty()); // back up to prior version and check DN sends old-style BR request.set(null); nsInfo.setCapabilities(Capability.UNKNOWN.getMask()); nn.blockReport(reg, "pool", sbr, new BlockReportContext(1, 0, System.nanoTime(), 0L)); proto = request.get(); assertNotNull(proto); assertFalse(proto.getReports(0).getBlocksList().isEmpty()); assertTrue(proto.getReports(0).getBlocksBuffersList().isEmpty()); }