@Test public void testConvertBlocksWithLocations() { boolean[] testSuite = new boolean[]{false, true}; for (int i = 0; i < testSuite.length; i++) { BlockWithLocations[] list = new BlockWithLocations[]{ getBlockWithLocations(1, testSuite[i]), getBlockWithLocations(2, testSuite[i])}; BlocksWithLocations locs = new BlocksWithLocations(list); BlocksWithLocationsProto locsProto = PBHelper.convert(locs); BlocksWithLocations locs2 = PBHelper.convert(locsProto); BlockWithLocations[] blocks = locs.getBlocks(); BlockWithLocations[] blocks2 = locs2.getBlocks(); assertEquals(blocks.length, blocks2.length); for (int j = 0; j < blocks.length; j++) { compare(blocks[j], blocks2[j]); } } }
@Override // NamenodeProtocol public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) throws IOException { if(size <= 0) { throw new IllegalArgumentException( "Unexpected not positive size: "+size); } checkNNStartup(); namesystem.checkSuperuserPrivilege(); return namesystem.getBlockManager().getBlocks(datanode, size); }
/** * Fetch new blocks of this source from namenode and update this source's * block list & {@link Dispatcher#globalBlocks}. * * @return the total size of the received blocks in the number of bytes. */ private long getBlockList() throws IOException { final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size); long bytesReceived = 0; for (BlockWithLocations blk : newBlocks.getBlocks()) { bytesReceived += blk.getBlock().getNumBytes(); synchronized (globalBlocks) { final DBlock block = globalBlocks.get(blk.getBlock()); synchronized (block) { block.clearLocations(); // update locations final String[] datanodeUuids = blk.getDatanodeUuids(); final StorageType[] storageTypes = blk.getStorageTypes(); for (int i = 0; i < datanodeUuids.length; i++) { final StorageGroup g = storageGroupMap.get( datanodeUuids[i], storageTypes[i]); if (g != null) { // not unknown block.addLocation(g); } } } if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) { // filter bad candidates srcBlocks.add(block); } } } return bytesReceived; }
/** * return a list of blocks & their locations on <code>datanode</code> whose * total size is <code>size</code> * * @param datanode on which blocks are located * @param size total size of blocks */ public BlocksWithLocations getBlocks(DatanodeID datanode, long size ) throws IOException { namesystem.checkOperation(OperationCategory.READ); namesystem.readLock(); try { namesystem.checkOperation(OperationCategory.READ); return getBlocksWithLocations(datanode, size); } finally { namesystem.readUnlock(); } }
public static BlocksWithLocationsProto convert(BlocksWithLocations blks) { BlocksWithLocationsProto.Builder builder = BlocksWithLocationsProto .newBuilder(); for (BlockWithLocations b : blks.getBlocks()) { builder.addBlocks(convert(b)); } return builder.build(); }
public static BlocksWithLocations convert(BlocksWithLocationsProto blocks) { List<BlockWithLocationsProto> b = blocks.getBlocksList(); BlockWithLocations[] ret = new BlockWithLocations[b.size()]; int i = 0; for (BlockWithLocationsProto entry : b) { ret[i++] = convert(entry); } return new BlocksWithLocations(ret); }
@Override public GetBlocksResponseProto getBlocks(RpcController unused, GetBlocksRequestProto request) throws ServiceException { DatanodeInfo dnInfo = new DatanodeInfo(PBHelper.convert(request .getDatanode())); BlocksWithLocations blocks; try { blocks = impl.getBlocks(dnInfo, request.getSize()); } catch (IOException e) { throw new ServiceException(e); } return GetBlocksResponseProto.newBuilder() .setBlocks(PBHelper.convert(blocks)).build(); }
@Override public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) throws IOException { GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder() .setDatanode(PBHelper.convert((DatanodeID)datanode)).setSize(size) .build(); try { return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req) .getBlocks()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Test public void testConvertBlocksWithLocations() { BlockWithLocations[] list = new BlockWithLocations[] { getBlockWithLocations(1), getBlockWithLocations(2) }; BlocksWithLocations locs = new BlocksWithLocations(list); BlocksWithLocationsProto locsProto = PBHelper.convert(locs); BlocksWithLocations locs2 = PBHelper.convert(locsProto); BlockWithLocations[] blocks = locs.getBlocks(); BlockWithLocations[] blocks2 = locs2.getBlocks(); assertEquals(blocks.length, blocks2.length); for (int i = 0; i < blocks.length; i++) { compare(blocks[i], blocks2[i]); } }
@Override public GetBlocksResponseProto getBlocks(RpcController unused, GetBlocksRequestProto request) throws ServiceException { DatanodeInfo dnInfo = new DatanodeInfo(PBHelperClient.convert(request .getDatanode())); BlocksWithLocations blocks; try { blocks = impl.getBlocks(dnInfo, request.getSize()); } catch (IOException e) { throw new ServiceException(e); } return GetBlocksResponseProto.newBuilder() .setBlocks(PBHelper.convert(blocks)).build(); }
@Override public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) throws IOException { GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder() .setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size) .build(); try { return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req) .getBlocks()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Override public BlocksWithLocations getBlocks(final DatanodeInfo datanode, final long size) throws IOException { return (failoverHandler.new ImmutableFSCaller<BlocksWithLocations>() { @Override public BlocksWithLocations call() throws IOException { return namenode.getBlocks(datanode, size); } }).callFS(); }
/** * return a list of blocks & their locations on <code>datanode</code> whose * total size is <code>size</code> * * @param datanode on which blocks are located * @param size total size of blocks */ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) throws IOException { if(size <= 0) { throw new IllegalArgumentException( "Unexpected not positive size: "+size); } return namesystem.getBlocks(datanode, size); }
@Override // NamenodeProtocol public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) throws IOException { if(size <= 0) { throw new IllegalArgumentException( "Unexpected not positive size: "+size); } namesystem.checkSuperuserPrivilege(); return namesystem.getBlockManager().getBlocks(datanode, size); }