@Override public void readBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final long blockOffset, final long length, final boolean sendChecksum, final CachingStrategy cachingStrategy) throws IOException { OpReadBlockProto proto = OpReadBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken)) .setOffset(blockOffset) .setLen(length) .setSendChecksums(sendChecksum) .setCachingStrategy(getCachingStrategy(cachingStrategy)) .build(); send(out, Op.READ_BLOCK, proto); }
/** Receive OP_READ_BLOCK */ private void opReadBlock() throws IOException { OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in)); TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); try { readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), proto.getOffset(), proto.getLen(), proto.getSendChecksums(), (proto.hasCachingStrategy() ? getCachingStrategy(proto.getCachingStrategy()) : CachingStrategy.newDefaultStrategy())); } finally { if (traceScope != null) traceScope.close(); } }
/** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * * @param lb the located block * @param dn the connected datanode * @return the inferred checksum type * @throws IOException if an error occurs */ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) throws IOException { IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb); try { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, HdfsConstants.SMALL_BUFFER_SIZE)); DataInputStream in = new DataInputStream(pair.in); new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true, CachingStrategy.newDefaultStrategy()); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn; DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtils.cleanup(null, pair.in, pair.out); } }
/** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * * @param lb the located block * @param dn the connected datanode * @return the inferred checksum type * @throws IOException if an error occurs */ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) throws IOException { IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb); try { DataOutputStream out = new DataOutputStream( new BufferedOutputStream(pair.out, smallBufferSize)); DataInputStream in = new DataInputStream(pair.in); new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true, CachingStrategy.newDefaultStrategy()); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn; DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); return PBHelperClient.convert( reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtilsClient.cleanup(null, pair.in, pair.out); } }
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage, boolean isAppend, String[] favoredNodes) { this.block = block; this.dfsClient = dfsClient; this.src = src; this.progress = progress; this.stat = stat; this.checksum4WriteBlock = checksum; this.cachingStrategy = cachingStrategy; this.byteArrayManager = byteArrayManage; this.isLazyPersistFile = isLazyPersist(stat); this.isAppend = isAppend; this.favoredNodes = favoredNodes; final DfsClientConf conf = dfsClient.getConf(); this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs(); this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry()); this.errorState = new ErrorState(conf.getDatanodeRestartTimeout()); }
/** Receive OP_READ_BLOCK */ private void opReadBlock() throws IOException { OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in)); TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); try { readBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()), PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), proto.getOffset(), proto.getLen(), proto.getSendChecksums(), (proto.hasCachingStrategy() ? getCachingStrategy(proto.getCachingStrategy()) : CachingStrategy.newDefaultStrategy())); } finally { if (traceScope != null) traceScope.close(); } }
/** Receive OP_WRITE_BLOCK */ private void opWriteBlock(DataInputStream in) throws IOException { final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in)); writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), PBHelper.convert(proto.getTargetsList()), PBHelper.convert(proto.getSource()), fromProto(proto.getStage()), proto.getPipelineSize(), proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(), proto.getLatestGenerationStamp(), fromProto(proto.getRequestedChecksum()), (proto.hasCachingStrategy() ? getCachingStrategy(proto.getCachingStrategy()) : CachingStrategy.newDefaultStrategy())); }
private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long newGS, String description, Boolean eofExcepted) throws IOException { sendBuf.reset(); recvBuf.reset(); sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], null, stage, 0, block.getNumBytes(), block.getNumBytes(), newGS, DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy()); if (eofExcepted) { sendResponse(Status.ERROR, null, null, recvOut); sendRecvData(description, true); } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { //ok finally write a block with 0 len sendResponse(Status.SUCCESS, "", null, recvOut); sendRecvData(description, false); } else { writeZeroLengthPacket(block, description); } }
/** * Get a BlockReader for the given block. */ public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead) throws IOException { InetSocketAddress targetAddr = null; Socket sock = null; ExtendedBlock block = testBlock.getBlock(); DatanodeInfo[] nodes = testBlock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); sock.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); return BlockReaderFactory.newBlockReader( new DFSClient.Conf(conf), targetAddr.toString()+ ":" + block.getBlockId(), block, testBlock.getBlockToken(), offset, lenToRead, true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock), nodes[0], null, null, null, false, CachingStrategy.newDefaultStrategy()); }
private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, HdfsFileStatus stat, DataChecksum checksum) throws IOException { super(checksum, checksum.getBytesPerChecksum(), checksum.getChecksumSize()); this.dfsClient = dfsClient; this.src = src; this.fileId = stat.getFileId(); this.blockSize = stat.getBlockSize(); this.blockReplication = stat.getReplication(); this.progress = progress; this.cachingStrategy = new AtomicReference<CachingStrategy>( dfsClient.getDefaultWriteCachingStrategy()); if ((progress != null) && DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug( "Set non-null progress callback on DFSOutputStream " + src); } final int bytesPerChecksum = checksum.getBytesPerChecksum(); if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) { throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum + ") and blockSize(" + blockSize + ") do not match. " + "blockSize should be a " + "multiple of io.bytes.per.checksum"); } this.checksum = checksum; }
private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, HdfsFileStatus stat, DataChecksum checksum) throws IOException { super(getChecksum4Compute(checksum, stat)); this.dfsClient = dfsClient; this.src = src; this.fileId = stat.getFileId(); this.blockSize = stat.getBlockSize(); this.blockReplication = stat.getReplication(); this.fileEncryptionInfo = stat.getFileEncryptionInfo(); this.progress = progress; this.cachingStrategy = new AtomicReference<CachingStrategy>( dfsClient.getDefaultWriteCachingStrategy()); if ((progress != null) && DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug( "Set non-null progress callback on DFSOutputStream " + src); } this.bytesPerChecksum = checksum.getBytesPerChecksum(); if (bytesPerChecksum <= 0) { throw new HadoopIllegalArgumentException( "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0"); } if (blockSize % bytesPerChecksum != 0) { throw new HadoopIllegalArgumentException("Invalid values: " + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum + ") must divide block size (=" + blockSize + ")."); } this.checksum4WriteBlock = checksum; this.dfsclientSlowLogThresholdMs = dfsClient.getConf().dfsclientSlowIoWarningThresholdMs; this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager(); }
@Override public void setDropBehind(Boolean dropBehind) throws IOException { CachingStrategy prevStrategy, nextStrategy; // CachingStrategy is immutable. So build a new CachingStrategy with the // modifications we want, and compare-and-swap it in. do { prevStrategy = this.cachingStrategy.get(); nextStrategy = new CachingStrategy.Builder(prevStrategy). setDropBehind(dropBehind).build(); } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy)); }
@Override public synchronized void setReadahead(Long readahead) throws IOException { synchronized (infoLock) { this.cachingStrategy = new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build(); } closeCurrentBlockReader(); }
@Override public synchronized void setDropBehind(Boolean dropBehind) throws IOException { synchronized (infoLock) { this.cachingStrategy = new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build(); } closeCurrentBlockReader(); }
public Builder setCachingStrategy(CachingStrategy cachingStrategy) { long readahead = cachingStrategy.getReadahead() != null ? cachingStrategy.getReadahead() : DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT; this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead); return this; }
static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) { CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder(); if (cachingStrategy.getReadahead() != null) { builder.setReadahead(cachingStrategy.getReadahead().longValue()); } if (cachingStrategy.getDropBehind() != null) { builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue()); } return builder.build(); }
static private CachingStrategy getCachingStrategy(CachingStrategyProto strategy) { Boolean dropBehind = strategy.hasDropBehind() ? strategy.getDropBehind() : null; Long readahead = strategy.hasReadahead() ? strategy.getReadahead() : null; return new CachingStrategy(dropBehind, readahead); }
/** Receive OP_WRITE_BLOCK */ private void opWriteBlock(DataInputStream in) throws IOException { final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in)); final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); try { writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), PBHelper.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), targets, PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length), PBHelper.convert(proto.getSource()), fromProto(proto.getStage()), proto.getPipelineSize(), proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(), proto.getLatestGenerationStamp(), fromProto(proto.getRequestedChecksum()), (proto.hasCachingStrategy() ? getCachingStrategy(proto.getCachingStrategy()) : CachingStrategy.newDefaultStrategy()), (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false), (proto.hasPinning() ? proto.getPinning(): false), (PBHelper.convertBooleanList(proto.getTargetPinningsList()))); } finally { if (traceScope != null) traceScope.close(); } }
void writeBlock(ExtendedBlock block, BlockConstructionStage stage, long newGS, DataChecksum checksum) throws IOException { sender.writeBlock(block, StorageType.DEFAULT, BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], new StorageType[1], null, stage, 0, block.getNumBytes(), block.getNumBytes(), newGS, checksum, CachingStrategy.newDefaultStrategy(), false, false, null); }
protected BlockReader getBlockReader(LocatedBlock targetBlock, long offsetInBlock, long length, InetSocketAddress targetAddr, StorageType storageType, DatanodeInfo datanode) throws IOException { ExtendedBlock blk = targetBlock.getBlock(); Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken(); CachingStrategy curCachingStrategy; boolean shortCircuitForbidden; synchronized (infoLock) { curCachingStrategy = cachingStrategy; shortCircuitForbidden = shortCircuitForbidden(); } return new BlockReaderFactory(dfsClient.getConf()). setInetSocketAddress(targetAddr). setRemotePeerFactory(dfsClient). setDatanodeInfo(datanode). setStorageType(storageType). setFileName(src). setBlock(blk). setBlockToken(accessToken). setStartOffset(offsetInBlock). setVerifyChecksum(verifyChecksum). setClientName(dfsClient.clientName). setLength(length). setCachingStrategy(curCachingStrategy). setAllowShortCircuitLocalReads(!shortCircuitForbidden). setClientCacheContext(dfsClient.getClientContext()). setUserGroupInformation(dfsClient.ugi). setConfiguration(dfsClient.getConfiguration()). setTracer(dfsClient.getTracer()). build(); }
@Override public synchronized void setReadahead(Long readahead) throws IOException { synchronized (infoLock) { this.cachingStrategy = new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build(); } closeCurrentBlockReaders(); }
@Override public synchronized void setDropBehind(Boolean dropBehind) throws IOException { synchronized (infoLock) { this.cachingStrategy = new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build(); } closeCurrentBlockReaders(); }
public Builder setCachingStrategy(CachingStrategy cachingStrategy) { long readahead = cachingStrategy.getReadahead() != null ? cachingStrategy.getReadahead() : HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT; this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead); return this; }