/** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * * @param lb the located block * @param dn the connected datanode * @return the inferred checksum type * @throws IOException if an error occurs */ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) throws IOException { IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb); try { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, HdfsConstants.SMALL_BUFFER_SIZE)); DataInputStream in = new DataInputStream(pair.in); new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true, CachingStrategy.newDefaultStrategy()); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn; DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtils.cleanup(null, pair.in, pair.out); } }
/** For {@link TestTransferRbw} */ public static BlockOpResponseProto transferRbw(final ExtendedBlock b, final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException { assertEquals(2, datanodes.length); final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0], datanodes.length, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( NetUtils.getOutputStream(s, writeTimeout), HdfsConstants.SMALL_BUFFER_SIZE)); final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s)); // send the request new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(), dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}, new StorageType[]{StorageType.DEFAULT}); out.flush(); return BlockOpResponseProto.parseDelimitedFrom(in); }
/** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * * @param lb the located block * @param dn the connected datanode * @return the inferred checksum type * @throws IOException if an error occurs */ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) throws IOException { IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb); try { DataOutputStream out = new DataOutputStream( new BufferedOutputStream(pair.out, smallBufferSize)); DataInputStream in = new DataInputStream(pair.in); new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true, CachingStrategy.newDefaultStrategy()); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn; DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); return PBHelperClient.convert( reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtilsClient.cleanup(null, pair.in, pair.out); } }
/** For {@link TestTransferRbw} */ public static BlockOpResponseProto transferRbw(final ExtendedBlock b, final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException { assertEquals(2, datanodes.length); final Socket s = DataStreamer.createSocketForPipeline(datanodes[0], datanodes.length, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( NetUtils.getOutputStream(s, writeTimeout), DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration()))); final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s)); // send the request new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(), dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}, new StorageType[]{StorageType.DEFAULT}); out.flush(); return BlockOpResponseProto.parseDelimitedFrom(in); }
/** For {@link TestTransferRbw} */ public static BlockOpResponseProto transferRbw(final ExtendedBlock b, final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException { assertEquals(2, datanodes.length); final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0], datanodes.length, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( NetUtils.getOutputStream(s, writeTimeout), HdfsConstants.SMALL_BUFFER_SIZE)); final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s)); // send the request new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(), dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}); out.flush(); return BlockOpResponseProto.parseDelimitedFrom(in); }
private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr( destination.getXferAddr()), HdfsServerConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, source.getStorageID(), sourceProxy); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply); return proto.getStatus() == Status.SUCCESS; }
private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr( destination.getXferAddr()), HdfsServerConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); new Sender(out).replaceBlock(block, StorageType.DEFAULT, BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(), sourceProxy); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply); while (proto.getStatus() == Status.IN_PROGRESS) { proto = BlockOpResponseProto.parseDelimitedFrom(reply); } return proto.getStatus() == Status.SUCCESS; }
/** * For {@link TestTransferRbw} */ public static BlockOpResponseProto transferRbw(final ExtendedBlock b, final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException { assertEquals(2, datanodes.length); final Socket s = DFSOutputStream .createSocketForPipeline(datanodes[0], datanodes.length, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length); final DataOutputStream out = new DataOutputStream( new BufferedOutputStream(NetUtils.getOutputStream(s, writeTimeout), HdfsConstants.SMALL_BUFFER_SIZE)); final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s)); // send the request new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(), dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}); out.flush(); return BlockOpResponseProto.parseDelimitedFrom(in); }
private boolean replaceBlock(ExtendedBlock block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()), HdfsServerConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, source.getStorageID(), sourceProxy); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply); return proto.getStatus() == Status.SUCCESS; }
private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr( destination.getXferAddr()), HdfsServerConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(), sourceProxy); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply); return proto.getStatus() == Status.SUCCESS; }
private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination, StorageType targetStorageType) throws IOException, SocketException { Socket sock = new Socket(); try { sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()), HdfsServerConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); new Sender(out).replaceBlock(block, targetStorageType, BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(), sourceProxy); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply); while (proto.getStatus() == Status.IN_PROGRESS) { proto = BlockOpResponseProto.parseDelimitedFrom(reply); } return proto.getStatus() == Status.SUCCESS; } finally { sock.close(); } }
@Override public void run() { LOG.trace("{}: about to release {}", ShortCircuitCache.this, slot); final DfsClientShm shm = (DfsClientShm)slot.getShm(); final DomainSocket shmSock = shm.getPeer().getDomainSocket(); final String path = shmSock.getPath(); boolean success = false; try (DomainSocket sock = DomainSocket.connect(path); DataOutputStream out = new DataOutputStream( new BufferedOutputStream(sock.getOutputStream()))) { new Sender(out).releaseShortCircuitFds(slot.getSlotId()); DataInputStream in = new DataInputStream(sock.getInputStream()); ReleaseShortCircuitAccessResponseProto resp = ReleaseShortCircuitAccessResponseProto.parseFrom( PBHelperClient.vintPrefixed(in)); if (resp.getStatus() != Status.SUCCESS) { String error = resp.hasError() ? resp.getError() : "(unknown)"; throw new IOException(resp.getStatus().toString() + ": " + error); } LOG.trace("{}: released {}", this, slot); success = true; } catch (IOException e) { LOG.error(ShortCircuitCache.this + ": failed to release " + "short-circuit shared memory slot " + slot + " by sending " + "ReleaseShortCircuitAccessRequestProto to " + path + ". Closing shared memory segment.", e); } finally { if (success) { shmManager.freeSlot(slot); } else { shm.getEndpointShmManager().shutdown(shm); } } }
private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination, StorageType targetStorageType) throws IOException, SocketException { Socket sock = new Socket(); try { sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()), HdfsConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); new Sender(out).replaceBlock(block, targetStorageType, BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(), sourceProxy); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply); while (proto.getStatus() == Status.IN_PROGRESS) { proto = BlockOpResponseProto.parseDelimitedFrom(reply); } return proto.getStatus() == Status.SUCCESS; } finally { sock.close(); } }
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final Token<BlockTokenIdentifier> blockToken) throws IOException { //transfer replica to the new datanode Socket sock = null; DataOutputStream out = null; DataInputStream in = null; try { sock = createSocketForPipeline(src, 2, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(sock); IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock, unbufOut, unbufIn, dfsClient, blockToken, src); unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); in = new DataInputStream(unbufIn); //send the TRANSFER_BLOCK request new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, targets, targetStorageTypes); out.flush(); //ack BlockOpResponseProto response = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (SUCCESS != response.getStatus()) { throw new IOException("Failed to add a datanode"); } } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); IOUtils.closeSocket(sock); } }
/** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * * @param lb the located block * @param dn the connected datanode * @return the inferred checksum type * @throws IOException if an error occurs */ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) throws IOException { IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb); try { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, HdfsConstants.SMALL_BUFFER_SIZE)); DataInputStream in = new DataInputStream(pair.in); new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true, CachingStrategy.newDefaultStrategy()); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (reply.getStatus() != Status.SUCCESS) { if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException(); } else { throw new IOException("Bad response " + reply + " trying to read " + lb.getBlock() + " from datanode " + dn); } } return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtils.cleanup(null, pair.in, pair.out); } }
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, final Token<BlockTokenIdentifier> blockToken) throws IOException { //transfer replica to the new datanode Socket sock = null; DataOutputStream out = null; DataInputStream in = null; try { sock = createSocketForPipeline(src, 2, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(sock); if (dfsClient.shouldEncryptData()) { IOStreamPair encryptedStreams = DataTransferEncryptor.getEncryptedStreams( unbufOut, unbufIn, dfsClient.getDataEncryptionKey()); unbufOut = encryptedStreams.out; unbufIn = encryptedStreams.in; } out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); in = new DataInputStream(unbufIn); //send the TRANSFER_BLOCK request new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, targets); out.flush(); //ack BlockOpResponseProto response = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (SUCCESS != response.getStatus()) { throw new IOException("Failed to add a datanode"); } } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); IOUtils.closeSocket(sock); } }
/** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * * @param in input stream from datanode * @param out output stream to datanode * @param lb the located block * @param clientName the name of the DFSClient requesting the checksum * @param dn the connected datanode * @return the inferred checksum type * @throws IOException if an error occurs */ private static Type inferChecksumTypeByReading( String clientName, SocketFactory socketFactory, int socketTimeout, LocatedBlock lb, DatanodeInfo dn, DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) throws IOException { IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname, encryptionKey, dn, socketTimeout); try { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, HdfsConstants.SMALL_BUFFER_SIZE)); DataInputStream in = new DataInputStream(pair.in); new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (reply.getStatus() != Status.SUCCESS) { if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException(); } else { throw new IOException("Bad response " + reply + " trying to read " + lb.getBlock() + " from datanode " + dn); } } return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtils.cleanup(null, pair.in, pair.out); } }
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, final Token<BlockTokenIdentifier> blockToken) throws IOException { //transfer replica to the new datanode Socket sock = null; DataOutputStream out = null; DataInputStream in = null; try { sock = createSocketForPipeline(src, 2, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(sock); if (dfsClient.shouldEncryptData()) { IOStreamPair encryptedStreams = DataTransferEncryptor .getEncryptedStreams(unbufOut, unbufIn, dfsClient.getDataEncryptionKey()); unbufOut = encryptedStreams.out; unbufIn = encryptedStreams.in; } out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); in = new DataInputStream(unbufIn); //send the TRANSFER_BLOCK request new Sender(out) .transferBlock(block, blockToken, dfsClient.clientName, targets); out.flush(); //ack BlockOpResponseProto response = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (SUCCESS != response.getStatus()) { throw new IOException("Failed to add a datanode"); } } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); IOUtils.closeSocket(sock); } }
/** * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. * * @param sock * An established Socket to the DN. The BlockReader will not close it * normally * @param file * File location * @param block * The block object * @param blockToken * The block token for security * @param startOffset * The read offset, relative to block head * @param len * The number of bytes to read * @param bufferSize * The IO buffer size (not the client buffer size) * @param verifyChecksum * Whether to verify checksum * @param clientName * Client name * @return New BlockReader instance, or null on error. */ public static RemoteBlockReader newBlockReader(Socket sock, String file, ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, long startOffset, long len, int bufferSize, boolean verifyChecksum, String clientName) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT))); new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum); // // Get bytes in block, set streams // DataInputStream in = new DataInputStream( new BufferedInputStream(NetUtils.getInputStream(sock), bufferSize)); BlockOpResponseProto status = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); RemoteBlockReader2.checkSuccess(status, sock, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto(checksumInfo.getChecksum()); //Warning when we get CHECKSUM_NULL? // Read the first chunk offset. long firstChunkOffset = checksumInfo.getChunkOffset(); if (firstChunkOffset < 0 || firstChunkOffset > startOffset || firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { throw new IOException("BlockReader: error in first chunk offset (" + firstChunkOffset + ") startOffset is " + startOffset + " for file " + file); } return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock); }
/** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * * @param lb * the located block * @param clientName * the name of the DFSClient requesting the checksum * @param dn * the connected datanode * @return the inferred checksum type * @throws IOException * if an error occurs */ private static Type inferChecksumTypeByReading(String clientName, SocketFactory socketFactory, int socketTimeout, LocatedBlock lb, DatanodeInfo dn, DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) throws IOException { IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname, encryptionKey, dn, socketTimeout); try { DataOutputStream out = new DataOutputStream( new BufferedOutputStream(pair.out, HdfsConstants.SMALL_BUFFER_SIZE)); DataInputStream in = new DataInputStream(pair.in); new Sender(out) .readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (reply.getStatus() != Status.SUCCESS) { if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException(); } else { throw new IOException( "Bad response " + reply + " trying to read " + lb.getBlock() + " from datanode " + dn); } } return PBHelper .convert(reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtils.cleanup(null, pair.in, pair.out); } }
private void sendRequest(DataOutputStream out) throws IOException { final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock()); final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb); new Sender(out).replaceBlock(eb, accessToken, source.getStorageID(), proxySource.getDatanode()); }
/** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * * @param in input stream from datanode * @param out output stream to datanode * @param lb the located block * @param clientName the name of the DFSClient requesting the checksum * @param dn the connected datanode * @return the inferred checksum type * @throws IOException if an error occurs */ private static Type inferChecksumTypeByReading( String clientName, SocketFactory socketFactory, int socketTimeout, LocatedBlock lb, DatanodeInfo dn, DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) throws IOException { IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname, encryptionKey, dn, socketTimeout); try { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, HdfsConstants.SMALL_BUFFER_SIZE)); DataInputStream in = new DataInputStream(pair.in); new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true, CachingStrategy.newDefaultStrategy()); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (reply.getStatus() != Status.SUCCESS) { if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException(); } else { throw new IOException("Bad response " + reply + " trying to read " + lb.getBlock() + " from datanode " + dn); } } return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtils.cleanup(null, pair.in, pair.out); } }