LocalDatanodeInfo() { final int cacheSize = 10000; final float hashTableLoadFactor = 0.75f; int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1; cache = Collections .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>( hashTableCapacity, hashTableLoadFactor, true) { private static final long serialVersionUID = 1; @Override protected boolean removeEldestEntry( Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) { return size() > cacheSize; } }); }
@Override // FsDatasetSpi public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) throws IOException { synchronized(this) { final Replica replica = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); if (replica == null) { throw new ReplicaNotFoundException(block); } if (replica.getGenerationStamp() < block.getGenerationStamp()) { throw new IOException( "Replica generation stamp < block generation stamp, block=" + block + ", replica=" + replica); } else if (replica.getGenerationStamp() > block.getGenerationStamp()) { block.setGenerationStamp(replica.getGenerationStamp()); } } File datafile = getBlockFile(block); File metafile = FsDatasetUtil.getMetaFile(datafile, block.getGenerationStamp()); BlockLocalPathInfo info = new BlockLocalPathInfo(block, datafile.getAbsolutePath(), metafile.getAbsolutePath()); return info; }
@Override public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token<BlockTokenIdentifier> token) throws IOException { checkBlockLocalPathAccess(); checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ); Preconditions.checkNotNull(data, "Storage not yet initialized"); BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); if (LOG.isDebugEnabled()) { if (info != null) { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo successful block=" + block + " blockfile " + info.getBlockPath() + " metafile " + info.getMetaPath()); } } else { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo for block=" + block + " returning null"); } } } metrics.incrBlocksGetLocalPathInfo(); return info; }
@Override public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token<BlockTokenIdentifier> token) throws IOException { GetBlockLocalPathInfoRequestProto req = GetBlockLocalPathInfoRequestProto.newBuilder() .setBlock(PBHelper.convert(block)) .setToken(PBHelper.convert(token)).build(); GetBlockLocalPathInfoResponseProto resp; try { resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } return new BlockLocalPathInfo(PBHelper.convert(resp.getBlock()), resp.getLocalPath(), resp.getLocalMetaPath()); }
@Override public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token<BlockTokenIdentifier> token) throws IOException { GetBlockLocalPathInfoRequestProto req = GetBlockLocalPathInfoRequestProto.newBuilder() .setBlock(PBHelperClient.convert(block)) .setToken(PBHelperClient.convert(token)).build(); GetBlockLocalPathInfoResponseProto resp; try { resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } return new BlockLocalPathInfo(PBHelperClient.convert(resp.getBlock()), resp.getLocalPath(), resp.getLocalMetaPath()); }
@Override public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token<BlockTokenIdentifier> token) throws IOException { checkBlockLocalPathAccess(); checkBlockToken(block, token, BlockTokenIdentifier.AccessMode.READ); Preconditions.checkNotNull(data, "Storage not yet initialized"); BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); if (LOG.isDebugEnabled()) { if (info != null) { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo successful block=" + block + " blockfile " + info.getBlockPath() + " metafile " + info.getMetaPath()); } } else { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo for block=" + block + " returning null"); } } } metrics.incrBlocksGetLocalPathInfo(); return info; }
@Override public GetBlockLocalPathInfoResponseProto getBlockLocalPathInfo( RpcController unused, GetBlockLocalPathInfoRequestProto request) throws ServiceException { BlockLocalPathInfo resp; try { resp = impl.getBlockLocalPathInfo( PBHelperClient.convert(request.getBlock()), PBHelperClient.convert(request.getToken())); } catch (IOException e) { throw new ServiceException(e); } return GetBlockLocalPathInfoResponseProto.newBuilder() .setBlock(PBHelperClient.convert(resp.getBlock())) .setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath()) .build(); }
private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi, ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout, Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname) throws IOException { LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort()); BlockLocalPathInfo pathinfo = null; ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node, conf, timeout, connectToDnViaHostname); try { // make RPC to local datanode to find local pathnames of blocks pathinfo = proxy.getBlockLocalPathInfo(blk, token); if (pathinfo != null) { if (LOG.isDebugEnabled()) { LOG.debug("Cached location of block " + blk + " as " + pathinfo); } localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo); } } catch (IOException e) { localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error throw e; } return pathinfo; }
@Override public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token<BlockTokenIdentifier> token) throws IOException { checkBlockLocalPathAccess(); checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ); BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); if (LOG.isDebugEnabled()) { if (info != null) { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo successful block=" + block + " blockfile " + info.getBlockPath() + " metafile " + info.getMetaPath()); } } else { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo for block=" + block + " returning null"); } } } metrics.incrBlocksGetLocalPathInfo(); return info; }
LocalDatanodeInfo() { final int cacheSize = 10000; final float hashTableLoadFactor = 0.75f; int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1; cache = Collections.synchronizedMap( new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>( hashTableCapacity, hashTableLoadFactor, true) { private static final long serialVersionUID = 1; @Override protected boolean removeEldestEntry( Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) { return size() > cacheSize; } }); }
private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout, Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname) throws IOException { LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort()); BlockLocalPathInfo pathinfo = null; ClientDatanodeProtocol proxy = localDatanodeInfo .getDatanodeProxy(node, conf, timeout, connectToDnViaHostname); try { // make RPC to local datanode to find local pathnames of blocks pathinfo = proxy.getBlockLocalPathInfo(blk, token); if (pathinfo != null) { if (LOG.isDebugEnabled()) { LOG.debug("Cached location of block " + blk + " as " + pathinfo); } localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo); } } catch (IOException e) { localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error throw e; } return pathinfo; }
@Override public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token<BlockTokenIdentifier> token) throws IOException { checkBlockLocalPathAccess(); checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ); BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); if (LOG.isDebugEnabled()) { if (info != null) { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo successful block=" + block + " blockfile " + info.getBlockPath() + " metafile " + info.getMetaPath()); } } else { if (LOG.isTraceEnabled()) { LOG.trace( "getBlockLocalPathInfo for block=" + block + " returning null"); } } } metrics.incrBlocksGetLocalPathInfo(); return info; }
@Override public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token<BlockTokenIdentifier> token) throws IOException { GetBlockLocalPathInfoRequestProto req = GetBlockLocalPathInfoRequestProto.newBuilder() .setBlock(PBHelper.convert(block)).setToken(PBHelper.convert(token)) .build(); GetBlockLocalPathInfoResponseProto resp; try { resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } return new BlockLocalPathInfo(PBHelper.convert(resp.getBlock()), resp.getLocalPath(), resp.getLocalMetaPath()); }
@Override public GetBlockLocalPathInfoResponseProto getBlockLocalPathInfo( RpcController unused, GetBlockLocalPathInfoRequestProto request) throws ServiceException { BlockLocalPathInfo resp; try { resp = impl.getBlockLocalPathInfo(PBHelper.convert(request.getBlock()), PBHelper.convert(request.getToken())); } catch (IOException e) { throw new ServiceException(e); } return GetBlockLocalPathInfoResponseProto.newBuilder() .setBlock(PBHelper.convert(resp.getBlock())) .setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath()) .build(); }
LocalDatanodeInfo() { final int cacheSize = 10000; final float hashTableLoadFactor = 0.75f; int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1; cache = Collections .synchronizedMap(new LinkedHashMap<Block, BlockLocalPathInfo>( hashTableCapacity, hashTableLoadFactor, true) { private static final long serialVersionUID = 1; @Override protected boolean removeEldestEntry( Map.Entry<Block, BlockLocalPathInfo> eldest) { return size() > cacheSize; } }); }
private static BlockLocalPathInfo getBlockPathInfo(Block blk, DatanodeInfo node, Configuration conf, int timeout, Token<BlockTokenIdentifier> token) throws IOException { LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.ipcPort); BlockLocalPathInfo pathinfo = null; ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node, conf, timeout); try { // make RPC to local datanode to find local pathnames of blocks pathinfo = proxy.getBlockLocalPathInfo(blk, token); if (pathinfo != null) { if (LOG.isDebugEnabled()) { LOG.debug("Cached location of block " + blk + " as " + pathinfo); } localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo); } } catch (IOException e) { localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error throw e; } return pathinfo; }
private BlockReaderLocal(Configuration conf, String hdfsfile, Block block, Token<BlockTokenIdentifier> token, long startOffset, long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn) throws IOException { super( new Path("/blk_" + block.getBlockId() + ":of:" + hdfsfile) /*too non path-like?*/, 1); this.startOffset = startOffset; this.dataIn = dataIn; long toSkip = startOffset; while (toSkip > 0) { long skipped = dataIn.skip(toSkip); if (skipped == 0) { throw new IOException("Couldn't initialize input stream"); } toSkip -= skipped; } }
@Override public BlockLocalPathInfo getBlockLocalPathInfo(Block block, Token<BlockTokenIdentifier> token) throws IOException { checkBlockLocalPathAccess(); checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ); BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); if (LOG.isDebugEnabled()) { if (info != null) { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo successful block=" + block + " blockfile " + info.getBlockPath() + " metafile " + info.getMetaPath()); } } else { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo for block=" + block + " returning null"); } } } myMetrics.incrBlocksGetLocalPathInfo(); return info; }