/** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * * @param lb the located block * @param dn the connected datanode * @return the inferred checksum type * @throws IOException if an error occurs */ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) throws IOException { IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb); try { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, HdfsConstants.SMALL_BUFFER_SIZE)); DataInputStream in = new DataInputStream(pair.in); new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true, CachingStrategy.newDefaultStrategy()); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn; DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtils.cleanup(null, pair.in, pair.out); } }
/** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * * @param lb the located block * @param dn the connected datanode * @return the inferred checksum type * @throws IOException if an error occurs */ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) throws IOException { IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb); try { DataOutputStream out = new DataOutputStream( new BufferedOutputStream(pair.out, smallBufferSize)); DataInputStream in = new DataInputStream(pair.in); new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true, CachingStrategy.newDefaultStrategy()); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn; DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); return PBHelperClient.convert( reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtilsClient.cleanup(null, pair.in, pair.out); } }
static void checkSuccess( BlockOpResponseProto status, Peer peer, ExtendedBlock block, String file) throws IOException { String logInfo = "for OP_READ_BLOCK" + ", self=" + peer.getLocalAddressString() + ", remote=" + peer.getRemoteAddressString() + ", for file " + file + ", for pool " + block.getBlockPoolId() + " block " + block.getBlockId() + "_" + block.getGenerationStamp(); DataTransferProtoUtil.checkBlockOpStatus(status, logInfo); }
/** Receive a block copy response from the input stream */ private void receiveResponse(DataInputStream in) throws IOException { BlockOpResponseProto response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); while (response.getStatus() == Status.IN_PROGRESS) { // read intermediate responses response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); } String logInfo = "block move is failed"; DataTransferProtoUtil.checkBlockOpStatus(response, logInfo); }
private void writeSuccessWithChecksumInfo(BlockSender blockSender, DataOutputStream out) throws IOException { ReadOpChecksumInfoProto ckInfo = ReadOpChecksumInfoProto.newBuilder() .setChecksum(DataTransferProtoUtil.toProto(blockSender.getChecksum())) .setChunkOffset(blockSender.getOffset()) .build(); BlockOpResponseProto response = BlockOpResponseProto.newBuilder() .setStatus(SUCCESS) .setReadOpChecksumInfo(ckInfo) .build(); response.writeDelimitedTo(out); out.flush(); }
/** Receive a reportedBlock copy response from the input stream */ private void receiveResponse(DataInputStream in) throws IOException { BlockOpResponseProto response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); while (response.getStatus() == Status.IN_PROGRESS) { // read intermediate responses response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); } String logInfo = "reportedBlock move is failed"; DataTransferProtoUtil.checkBlockOpStatus(response, logInfo); }
/** * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. * * @param sock * An established Socket to the DN. The BlockReader will not close it * normally * @param file * File location * @param block * The block object * @param blockToken * The block token for security * @param startOffset * The read offset, relative to block head * @param len * The number of bytes to read * @param bufferSize * The IO buffer size (not the client buffer size) * @param verifyChecksum * Whether to verify checksum * @param clientName * Client name * @return New BlockReader instance, or null on error. */ public static RemoteBlockReader newBlockReader(Socket sock, String file, ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, long startOffset, long len, int bufferSize, boolean verifyChecksum, String clientName) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT))); new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum); // // Get bytes in block, set streams // DataInputStream in = new DataInputStream( new BufferedInputStream(NetUtils.getInputStream(sock), bufferSize)); BlockOpResponseProto status = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); RemoteBlockReader2.checkSuccess(status, sock, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto(checksumInfo.getChecksum()); //Warning when we get CHECKSUM_NULL? // Read the first chunk offset. long firstChunkOffset = checksumInfo.getChunkOffset(); if (firstChunkOffset < 0 || firstChunkOffset > startOffset || firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { throw new IOException("BlockReader: error in first chunk offset (" + firstChunkOffset + ") startOffset is " + startOffset + " for file " + file); } return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock); }
private void writeSuccessWithChecksumInfo(BlockSender blockSender, DataOutputStream out) throws IOException { ReadOpChecksumInfoProto ckInfo = ReadOpChecksumInfoProto.newBuilder() .setChecksum(DataTransferProtoUtil.toProto(blockSender.getChecksum())) .setChunkOffset(blockSender.getOffset()).build(); BlockOpResponseProto response = BlockOpResponseProto.newBuilder().setStatus(SUCCESS) .setReadOpChecksumInfo(ckInfo).build(); response.writeDelimitedTo(out); out.flush(); }
/** * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. * * @param file File location * @param block The block object * @param blockToken The block token for security * @param startOffset The read offset, relative to block head * @param len The number of bytes to read * @param bufferSize The IO buffer size (not the client buffer size) * @param verifyChecksum Whether to verify checksum * @param clientName Client name * @return New BlockReader instance, or null on error. */ public static RemoteBlockReader newBlockReader(String file, ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, long startOffset, long len, int bufferSize, boolean verifyChecksum, String clientName, Peer peer, DatanodeID datanodeID, PeerCache peerCache, CachingStrategy cachingStrategy) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum, cachingStrategy); // // Get bytes in block, set streams // DataInputStream in = new DataInputStream( new BufferedInputStream(peer.getInputStream(), bufferSize)); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( PBHelper.vintPrefixed(in)); RemoteBlockReader2.checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); //Warning when we get CHECKSUM_NULL? // Read the first chunk offset. long firstChunkOffset = checksumInfo.getChunkOffset(); if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { throw new IOException("BlockReader: error in first chunk offset (" + firstChunkOffset + ") startOffset is " + startOffset + " for file " + file); } return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, peerCache); }
/** * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. * * @param file File location * @param block The block object * @param blockToken The block token for security * @param startOffset The read offset, relative to block head * @param len The number of bytes to read * @param verifyChecksum Whether to verify checksum * @param clientName Client name * @param peer The Peer to use * @param datanodeID The DatanodeID this peer is connected to * @return New BlockReader instance, or null on error. */ public static BlockReader newBlockReader(String file, ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, long startOffset, long len, boolean verifyChecksum, String clientName, Peer peer, DatanodeID datanodeID, PeerCache peerCache, CachingStrategy cachingStrategy) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( peer.getOutputStream())); new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum, cachingStrategy); // // Get bytes in block // DataInputStream in = new DataInputStream(peer.getInputStream()); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( PBHelper.vintPrefixed(in)); checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); //Warning when we get CHECKSUM_NULL? // Read the first chunk offset. long firstChunkOffset = checksumInfo.getChunkOffset(); if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { throw new IOException("BlockReader: error in first chunk offset (" + firstChunkOffset + ") startOffset is " + startOffset + " for file " + file); } return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, peerCache); }
/** * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. * * @param file File location * @param block The block object * @param blockToken The block token for security * @param startOffset The read offset, relative to block head * @param len The number of bytes to read * @param bufferSize The IO buffer size (not the client buffer size) * @param verifyChecksum Whether to verify checksum * @param clientName Client name * @return New BlockReader instance, or null on error. */ public static RemoteBlockReader newBlockReader(String file, ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, long startOffset, long len, int bufferSize, boolean verifyChecksum, String clientName, Peer peer, DatanodeID datanodeID, PeerCache peerCache, CachingStrategy cachingStrategy, Tracer tracer) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum, cachingStrategy); // // Get bytes in block, set streams // DataInputStream in = new DataInputStream( new BufferedInputStream(peer.getInputStream(), bufferSize)); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( PBHelperClient.vintPrefixed(in)); RemoteBlockReader2.checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); //Warning when we get CHECKSUM_NULL? // Read the first chunk offset. long firstChunkOffset = checksumInfo.getChunkOffset(); if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { throw new IOException("BlockReader: error in first chunk offset (" + firstChunkOffset + ") startOffset is " + startOffset + " for file " + file); } return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, peerCache, tracer); }
/** * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. * * @param file File location * @param block The block object * @param blockToken The block token for security * @param startOffset The read offset, relative to block head * @param len The number of bytes to read * @param verifyChecksum Whether to verify checksum * @param clientName Client name * @param peer The Peer to use * @param datanodeID The DatanodeID this peer is connected to * @return New BlockReader instance, or null on error. */ public static BlockReader newBlockReader(String file, ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, long startOffset, long len, boolean verifyChecksum, String clientName, Peer peer, DatanodeID datanodeID, PeerCache peerCache, CachingStrategy cachingStrategy, Tracer tracer) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( peer.getOutputStream())); new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum, cachingStrategy); // // Get bytes in block // DataInputStream in = new DataInputStream(peer.getInputStream()); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( PBHelperClient.vintPrefixed(in)); checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); //Warning when we get CHECKSUM_NULL? // Read the first chunk offset. long firstChunkOffset = checksumInfo.getChunkOffset(); if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { throw new IOException("BlockReader: error in first chunk offset (" + firstChunkOffset + ") startOffset is " + startOffset + " for file " + file); } return new RemoteBlockReader2(file, block.getBlockId(), checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, peerCache, tracer); }
/** * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. * * @param sock An established Socket to the DN. The BlockReader will not close it normally * @param file File location * @param block The block object * @param blockToken The block token for security * @param startOffset The read offset, relative to block head * @param len The number of bytes to read * @param bufferSize The IO buffer size (not the client buffer size) * @param verifyChecksum Whether to verify checksum * @param clientName Client name * @return New BlockReader instance, or null on error. */ public static RemoteBlockReader newBlockReader(String file, ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, long startOffset, long len, int bufferSize, boolean verifyChecksum, String clientName, Peer peer, DatanodeID datanodeID, PeerCache peerCache, CachingStrategy cachingStrategy) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum, cachingStrategy); // // Get bytes in block, set streams // DataInputStream in = new DataInputStream( new BufferedInputStream(peer.getInputStream(), bufferSize)); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( PBHelper.vintPrefixed(in)); RemoteBlockReader2.checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); //Warning when we get CHECKSUM_NULL? // Read the first chunk offset. long firstChunkOffset = checksumInfo.getChunkOffset(); if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { throw new IOException("BlockReader: error in first chunk offset (" + firstChunkOffset + ") startOffset is " + startOffset + " for file " + file); } return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, peerCache); }
/** * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. * * @param sock An established Socket to the DN. The BlockReader will not close it normally. * This socket must have an associated Channel. * @param file File location * @param block The block object * @param blockToken The block token for security * @param startOffset The read offset, relative to block head * @param len The number of bytes to read * @param verifyChecksum Whether to verify checksum * @param clientName Client name * @param peer The Peer to use * @param datanodeID The DatanodeID this peer is connected to * @return New BlockReader instance, or null on error. */ public static BlockReader newBlockReader(String file, ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, long startOffset, long len, boolean verifyChecksum, String clientName, Peer peer, DatanodeID datanodeID, PeerCache peerCache, CachingStrategy cachingStrategy) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( peer.getOutputStream())); new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum, cachingStrategy); // // Get bytes in block // DataInputStream in = new DataInputStream(peer.getInputStream()); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( PBHelper.vintPrefixed(in)); checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); //Warning when we get CHECKSUM_NULL? // Read the first chunk offset. long firstChunkOffset = checksumInfo.getChunkOffset(); if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { throw new IOException("BlockReader: error in first chunk offset (" + firstChunkOffset + ") startOffset is " + startOffset + " for file " + file); } return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, peerCache); }
/** * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. * * @param sock An established Socket to the DN. The BlockReader will not close it normally * @param file File location * @param block The block object * @param blockToken The block token for security * @param startOffset The read offset, relative to block head * @param len The number of bytes to read * @param bufferSize The IO buffer size (not the client buffer size) * @param verifyChecksum Whether to verify checksum * @param clientName Client name * @return New BlockReader instance, or null on error. */ public static RemoteBlockReader newBlockReader(String file, ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, long startOffset, long len, int bufferSize, boolean verifyChecksum, String clientName, Peer peer, DatanodeID datanodeID, PeerCache peerCache) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum); // // Get bytes in block, set streams // DataInputStream in = new DataInputStream( new BufferedInputStream(peer.getInputStream(), bufferSize)); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( PBHelper.vintPrefixed(in)); RemoteBlockReader2.checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); //Warning when we get CHECKSUM_NULL? // Read the first chunk offset. long firstChunkOffset = checksumInfo.getChunkOffset(); if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { throw new IOException("BlockReader: error in first chunk offset (" + firstChunkOffset + ") startOffset is " + startOffset + " for file " + file); } return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, peerCache); }
/** * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. * * @param sock An established Socket to the DN. The BlockReader will not close it normally. * This socket must have an associated Channel. * @param file File location * @param block The block object * @param blockToken The block token for security * @param startOffset The read offset, relative to block head * @param len The number of bytes to read * @param verifyChecksum Whether to verify checksum * @param clientName Client name * @param peer The Peer to use * @param datanodeID The DatanodeID this peer is connected to * @return New BlockReader instance, or null on error. */ public static BlockReader newBlockReader(String file, ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, long startOffset, long len, boolean verifyChecksum, String clientName, Peer peer, DatanodeID datanodeID, PeerCache peerCache) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( peer.getOutputStream())); new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum); // // Get bytes in block // DataInputStream in = new DataInputStream(peer.getInputStream()); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( PBHelper.vintPrefixed(in)); checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); //Warning when we get CHECKSUM_NULL? // Read the first chunk offset. long firstChunkOffset = checksumInfo.getChunkOffset(); if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { throw new IOException("BlockReader: error in first chunk offset (" + firstChunkOffset + ") startOffset is " + startOffset + " for file " + file); } return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, peerCache); }
private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client, String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) { Enum<?>[] storageTypes = locatedBlock.getStorageTypes(); DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); blockCopy.setNumBytes(locatedBlock.getBlockSize()); ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy)) .setToken(PB_HELPER.convert(locatedBlock.getBlockToken()))) .setClientName(clientName).build(); ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder() .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())) .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()) .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS) .setRequestedChecksum(checksumProto) .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length); for (int i = 0; i < datanodeInfos.length; i++) { DatanodeInfo dnInfo = datanodeInfos[i]; Enum<?> storageType = storageTypes[i]; Promise<Channel> promise = eventLoopGroup.next().newPromise(); futureList.add(promise); String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); new Bootstrap().group(eventLoopGroup).channel(channelClass) .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { // we need to get the remote address of the channel so we can only move on after // channel connected. Leave an empty implementation here because netty does not allow // a null handler. } }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder, timeoutMs, client, locatedBlock.getBlockToken(), promise); } else { promise.tryFailure(future.cause()); } } }); } return futureList; }