@Override public void blockReceivedAndDeleted(DatanodeRegistration registration, String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException { BlockReceivedAndDeletedRequestProto.Builder builder = BlockReceivedAndDeletedRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) .setBlockPoolId(poolId); for (StorageReceivedDeletedBlocks storageBlock : receivedAndDeletedBlocks) { StorageReceivedDeletedBlocksProto.Builder repBuilder = StorageReceivedDeletedBlocksProto.newBuilder(); repBuilder.setStorageUuid(storageBlock.getStorage().getStorageID()); // Set for wire compatibility. repBuilder.setStorage(PBHelper.convert(storageBlock.getStorage())); for (ReceivedDeletedBlockInfo rdBlock : storageBlock.getBlocks()) { repBuilder.addBlocks(PBHelper.convert(rdBlock)); } builder.addBlocks(repBuilder.build()); } try { rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, builder.build()); } catch (ServiceException se) { throw ProtobufHelper.getRemoteException(se); } }
private ExtendedBlock addBlocks(String fileName, String clientName) throws IOException { ExtendedBlock prevBlock = null; for(int jdx = 0; jdx < blocksPerFile; jdx++) { LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName, prevBlock, null, INodeId.GRANDFATHER_INODE_ID, null); prevBlock = loc.getBlock(); for(DatanodeInfo dnInfo : loc.getLocations()) { int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getXferAddr()); datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock()); ReceivedDeletedBlockInfo[] rdBlocks = { new ReceivedDeletedBlockInfo( loc.getBlock().getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) }; StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( datanodes[dnIdx].storage.getStorageID(), rdBlocks) }; nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc .getBlock().getBlockPoolId(), report); } } return prevBlock; }
private ReceivedDeletedBlockInfo[] waitForBlockReceived( final ExtendedBlock fakeBlock, final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception { final String fakeBlockPoolId = fakeBlock.getBlockPoolId(); final ArgumentCaptor<StorageReceivedDeletedBlocks[]> captor = ArgumentCaptor.forClass(StorageReceivedDeletedBlocks[].class); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { try { Mockito.verify(mockNN).blockReceivedAndDeleted( Mockito.<DatanodeRegistration>anyObject(), Mockito.eq(fakeBlockPoolId), captor.capture()); return true; } catch (Throwable t) { return false; } } }, 100, 10000); return captor.getValue()[0].getBlocks(); }
/** * Ensure that an IBR is generated immediately for a block received by * the DN. * * @throws InterruptedException * @throws IOException */ @Test (timeout=60000) public void testReportBlockReceived() throws InterruptedException, IOException { try { DatanodeProtocolClientSideTranslatorPB nnSpy = spyOnDnCallsToNn(); injectBlockReceived(); // Sleep for a very short time, this is necessary since the IBR is // generated asynchronously. Thread.sleep(2000); // Ensure that the received block was reported immediately. Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted( any(DatanodeRegistration.class), anyString(), any(StorageReceivedDeletedBlocks[].class)); } finally { cluster.shutdown(); cluster = null; } }
/** * The given node is reporting incremental information about some blocks. * This includes blocks that are starting to be received, completed being * received, or deleted. * * This method must be called with FSNamesystem lock held. */ public void processIncrementalBlockReport(final DatanodeID nodeID, final StorageReceivedDeletedBlocks srdb) throws IOException { assert namesystem.hasWriteLock(); final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); if (node == null || !node.isRegistered()) { blockLog.warn("BLOCK* processIncrementalBlockReport" + " is received from dead or unregistered node {}", nodeID); throw new IOException( "Got incremental block report from unregistered or dead node"); } try { processIncrementalBlockReport(node, srdb); } catch (Exception ex) { node.setForceRegistration(true); throw ex; } }
@Override public void blockReceivedAndDeleted(DatanodeRegistration registration, String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException { BlockReceivedAndDeletedRequestProto.Builder builder = BlockReceivedAndDeletedRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) .setBlockPoolId(poolId); for (StorageReceivedDeletedBlocks storageBlock : receivedAndDeletedBlocks) { StorageReceivedDeletedBlocksProto.Builder repBuilder = StorageReceivedDeletedBlocksProto.newBuilder(); repBuilder.setStorageUuid(storageBlock.getStorage().getStorageID()); // Set for wire compatibility. repBuilder.setStorage(PBHelperClient.convert(storageBlock.getStorage())); for (ReceivedDeletedBlockInfo rdBlock : storageBlock.getBlocks()) { repBuilder.addBlocks(PBHelper.convert(rdBlock)); } builder.addBlocks(repBuilder.build()); } try { rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, builder.build()); } catch (ServiceException se) { throw ProtobufHelper.getRemoteException(se); } }
private ExtendedBlock addBlocks(String fileName, String clientName) throws IOException { ExtendedBlock prevBlock = null; for(int jdx = 0; jdx < blocksPerFile; jdx++) { LocatedBlock loc = clientProto.addBlock(fileName, clientName, prevBlock, null, HdfsConstants.GRANDFATHER_INODE_ID, null); prevBlock = loc.getBlock(); for(DatanodeInfo dnInfo : loc.getLocations()) { int dnIdx = dnInfo.getXferPort() - 1; datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock()); ReceivedDeletedBlockInfo[] rdBlocks = { new ReceivedDeletedBlockInfo( loc.getBlock().getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) }; StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( datanodes[dnIdx].storage.getStorageID(), rdBlocks) }; dataNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, bpid, report); } } return prevBlock; }
/** * Transfer blocks to another data-node. * Just report on behalf of the other data-node * that the blocks have been received. */ private int transferBlocks( Block blocks[], DatanodeInfo xferTargets[][], String targetStorageIDs[][] ) throws IOException { for(int i = 0; i < blocks.length; i++) { DatanodeInfo blockTargets[] = xferTargets[i]; for(int t = 0; t < blockTargets.length; t++) { DatanodeInfo dnInfo = blockTargets[t]; String targetStorageID = targetStorageIDs[i][t]; DatanodeRegistration receivedDNReg; receivedDNReg = new DatanodeRegistration(dnInfo, new DataStorage(nsInfo), new ExportedBlockKeys(), VersionInfo.getVersion()); ReceivedDeletedBlockInfo[] rdBlocks = { new ReceivedDeletedBlockInfo( blocks[i], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) }; StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( targetStorageID, rdBlocks) }; nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode .getNamesystem().getBlockPoolId(), report); } } return blocks.length; }
@Override public void blockReceivedAndDeleted(DatanodeRegistration registration, String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException { BlockReceivedAndDeletedRequestProto.Builder builder = BlockReceivedAndDeletedRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) .setBlockPoolId(poolId); for (StorageReceivedDeletedBlocks storageBlock : receivedAndDeletedBlocks) { StorageReceivedDeletedBlocksProto.Builder repBuilder = StorageReceivedDeletedBlocksProto.newBuilder(); repBuilder.setStorageID(storageBlock.getStorageID()); for (ReceivedDeletedBlockInfo rdBlock : storageBlock.getBlocks()) { repBuilder.addBlocks(PBHelper.convert(rdBlock)); } builder.addBlocks(repBuilder.build()); } try { rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, builder.build()); } catch (ServiceException se) { throw ProtobufHelper.getRemoteException(se); } }
@Override public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted( RpcController controller, BlockReceivedAndDeletedRequestProto request) throws ServiceException { List<StorageReceivedDeletedBlocksProto> sBlocks = request.getBlocksList(); StorageReceivedDeletedBlocks[] info = new StorageReceivedDeletedBlocks[sBlocks.size()]; for (int i = 0; i < sBlocks.size(); i++) { StorageReceivedDeletedBlocksProto sBlock = sBlocks.get(i); List<ReceivedDeletedBlockInfoProto> list = sBlock.getBlocksList(); ReceivedDeletedBlockInfo[] rdBlocks = new ReceivedDeletedBlockInfo[list.size()]; for (int j = 0; j < list.size(); j++) { rdBlocks[j] = PBHelper.convert(list.get(j)); } info[i] = new StorageReceivedDeletedBlocks(sBlock.getStorageID(), rdBlocks); } try { impl.blockReceivedAndDeleted(PBHelper.convert(request.getRegistration()), request.getBlockPoolId(), info); } catch (IOException e) { throw new ServiceException(e); } return VOID_BLOCK_RECEIVED_AND_DELETE_RESPONSE; }
/** * Transfer blocks to another data-node. * Just report on behalf of the other data-node * that the blocks have been received. */ private int transferBlocks( Block blocks[], DatanodeInfo xferTargets[][] ) throws IOException { for(int i = 0; i < blocks.length; i++) { DatanodeInfo blockTargets[] = xferTargets[i]; for(int t = 0; t < blockTargets.length; t++) { DatanodeInfo dnInfo = blockTargets[t]; DatanodeRegistration receivedDNReg; receivedDNReg = new DatanodeRegistration(dnInfo, new DataStorage(nsInfo, dnInfo.getStorageID()), new ExportedBlockKeys(), VersionInfo.getVersion()); ReceivedDeletedBlockInfo[] rdBlocks = { new ReceivedDeletedBlockInfo( blocks[i], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) }; StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( receivedDNReg.getStorageID(), rdBlocks) }; nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode .getNamesystem().getBlockPoolId(), report); } } return blocks.length; }
private ExtendedBlock addBlocks(String fileName, String clientName) throws IOException { ExtendedBlock prevBlock = null; for(int jdx = 0; jdx < blocksPerFile; jdx++) { LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName, prevBlock, null, INodeId.GRANDFATHER_INODE_ID, null); prevBlock = loc.getBlock(); for(DatanodeInfo dnInfo : loc.getLocations()) { int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getXferAddr()); datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock()); ReceivedDeletedBlockInfo[] rdBlocks = { new ReceivedDeletedBlockInfo( loc.getBlock().getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) }; StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( datanodes[dnIdx].dnRegistration.getStorageID(), rdBlocks) }; nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc .getBlock().getBlockPoolId(), report); } } return prevBlock; }
private ReceivedDeletedBlockInfo[] waitForBlockReceived( ExtendedBlock fakeBlock, DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception { final ArgumentCaptor<StorageReceivedDeletedBlocks[]> captor = ArgumentCaptor.forClass(StorageReceivedDeletedBlocks[].class); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { try { Mockito.verify(mockNN1).blockReceivedAndDeleted( Mockito.<DatanodeRegistration>anyObject(), Mockito.eq(FAKE_BPID), captor.capture()); return true; } catch (Throwable t) { return false; } } }, 100, 10000); return captor.getValue()[0].getBlocks(); }
/** * Transfer blocks to another data-node. * Just report on behalf of the other data-node * that the blocks have been received. */ private int transferBlocks(Block blocks[], DatanodeInfo xferTargets[][]) throws IOException { for (int i = 0; i < blocks.length; i++) { DatanodeInfo blockTargets[] = xferTargets[i]; for (DatanodeInfo dnInfo : blockTargets) { DatanodeRegistration receivedDNReg; receivedDNReg = new DatanodeRegistration(dnInfo, new DataStorage(nsInfo, dnInfo.getStorageID()), new ExportedBlockKeys(), VersionInfo.getVersion()); ReceivedDeletedBlockInfo[] rdBlocks = {new ReceivedDeletedBlockInfo(blocks[i], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED, null)}; StorageReceivedDeletedBlocks[] report = {new StorageReceivedDeletedBlocks(receivedDNReg.getStorageID(), rdBlocks)}; nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode.getNamesystem().getBlockPoolId(), report); } } return blocks.length; }
private ExtendedBlock addBlocks(String fileName, String clientName) throws IOException { ExtendedBlock prevBlock = null; for (int jdx = 0; jdx < blocksPerFile; jdx++) { LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName, prevBlock, null); prevBlock = loc.getBlock(); for (DatanodeInfo dnInfo : loc.getLocations()) { int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getXferAddr()); datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock()); ReceivedDeletedBlockInfo[] rdBlocks = {new ReceivedDeletedBlockInfo(loc.getBlock().getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECEIVED, null)}; StorageReceivedDeletedBlocks[] report = {new StorageReceivedDeletedBlocks( datanodes[dnIdx].dnRegistration.getStorageID(), rdBlocks)}; nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc.getBlock().getBlockPoolId(), report); } } return prevBlock; }