/** * Connect to the given datanode's datantrasfer port, and return * the resulting IOStreamPair. This includes encryption wrapping, etc. */ public static IOStreamPair connectToDN(DatanodeInfo dn, int timeout, Configuration conf, SaslDataTransferClient saslClient, SocketFactory socketFactory, boolean connectToDnViaHostname, DataEncryptionKeyFactory dekFactory, Token<BlockTokenIdentifier> blockToken) throws IOException { boolean success = false; Socket sock = null; try { sock = socketFactory.createSocket(); String dnAddr = dn.getXferAddr(connectToDnViaHostname); LOG.debug("Connecting to datanode {}", dnAddr); NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout); sock.setSoTimeout(timeout); OutputStream unbufOut = NetUtils.getOutputStream(sock); InputStream unbufIn = NetUtils.getInputStream(sock); IOStreamPair pair = saslClient.newSocketSend(sock, unbufOut, unbufIn, dekFactory, blockToken, dn); IOStreamPair result = new IOStreamPair( new DataInputStream(pair.in), new DataOutputStream(new BufferedOutputStream(pair.out, NuCypherExtUtilClient.getSmallBufferSize(conf))) ); success = true; return result; } finally { if (!success) { IOUtils.closeSocket(sock); } } }
public static Peer peerFromSocketAndKey( SaslDataTransferClient saslClient, Socket s, DataEncryptionKeyFactory keyFactory, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; boolean success = false; try { peer = peerFromSocket(s); peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId); success = true; return peer; } finally { if (!success) { IOUtilsClient.cleanup(null, peer); } } }
public static Peer peerFromSocketAndKey( SaslDataTransferClient saslClient, Socket s, DataEncryptionKeyFactory keyFactory, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; boolean success = false; try { peer = peerFromSocket(s); peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId); success = true; return peer; } finally { if (!success) { IOUtils.cleanup(null, peer); } } }
/** * Returns a new DataEncryptionKeyFactory that generates a key from the * BlockPoolTokenSecretManager, using the block pool ID of the given block. * * @param block for which the factory needs to create a key * @return DataEncryptionKeyFactory for block's block pool ID */ DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock( final ExtendedBlock block) { return new DataEncryptionKeyFactory() { @Override public DataEncryptionKey newDataEncryptionKey() { return dnConf.encryptDataTransfer ? blockPoolTokenSecretManager.generateDataEncryptionKey( block.getBlockPoolId()) : null; } }; }
/** * Returns a new DataEncryptionKeyFactory that generates a key from the * BlockPoolTokenSecretManager, using the block pool ID of the given block. * * @param block for which the factory needs to create a key * @return DataEncryptionKeyFactory for block's block pool ID */ public DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock( final ExtendedBlock block) { return new DataEncryptionKeyFactory() { @Override public DataEncryptionKey newDataEncryptionKey() { return dnConf.encryptDataTransfer ? blockPoolTokenSecretManager.generateDataEncryptionKey( block.getBlockPoolId()) : null; } }; }
/** * Initialize output/input streams for transferring data to target * and send create block request. */ private int initTargetStreams(boolean[] targetsStatus) { int nsuccess = 0; for (int i = 0; i < targets.length; i++) { Socket socket = null; DataOutputStream out = null; DataInputStream in = null; boolean success = false; try { InetSocketAddress targetAddr = getSocketAddress4Transfer(targets[i]); socket = datanode.newSocket(); NetUtils.connect(socket, targetAddr, datanode.getDnConf().getSocketTimeout()); socket.setSoTimeout(datanode.getDnConf().getSocketTimeout()); ExtendedBlock block = getBlock(blockGroup, targetIndices[i]); Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(block, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); long writeTimeout = datanode.getDnConf().getSocketWriteTimeout(); OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(socket); DataEncryptionKeyFactory keyFactory = datanode.getDataEncryptionKeyFactoryForBlock(block); IOStreamPair saslStreams = datanode.getSaslClient().socketSend( socket, unbufOut, unbufIn, keyFactory, blockToken, targets[i]); unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, DFSUtilClient.getSmallBufferSize(conf))); in = new DataInputStream(unbufIn); DatanodeInfo source = new DatanodeInfo(datanode.getDatanodeId()); new Sender(out).writeBlock(block, targetStorageTypes[i], blockToken, "", new DatanodeInfo[]{targets[i]}, new StorageType[]{targetStorageTypes[i]}, source, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0, checksum, cachingStrategy, false, false, null); targetSockets[i] = socket; targetOutputStreams[i] = out; targetInputStreams[i] = in; nsuccess++; success = true; } catch (Throwable e) { LOG.warn(e.getMessage()); } finally { if (!success) { IOUtils.closeStream(out); IOUtils.closeStream(in); IOUtils.closeStream(socket); } } targetsStatus[i] = success; } return nsuccess; }