/** * Connect to the given datanode's datantrasfer port, and return * the resulting IOStreamPair. This includes encryption wrapping, etc. */ public static IOStreamPair connectToDN(DatanodeInfo dn, int timeout, Configuration conf, SaslDataTransferClient saslClient, SocketFactory socketFactory, boolean connectToDnViaHostname, DataEncryptionKeyFactory dekFactory, Token<BlockTokenIdentifier> blockToken) throws IOException { boolean success = false; Socket sock = null; try { sock = socketFactory.createSocket(); String dnAddr = dn.getXferAddr(connectToDnViaHostname); LOG.debug("Connecting to datanode {}", dnAddr); NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout); sock.setSoTimeout(timeout); OutputStream unbufOut = NetUtils.getOutputStream(sock); InputStream unbufIn = NetUtils.getInputStream(sock); IOStreamPair pair = saslClient.newSocketSend(sock, unbufOut, unbufIn, dekFactory, blockToken, dn); IOStreamPair result = new IOStreamPair( new DataInputStream(pair.in), new DataOutputStream(new BufferedOutputStream(pair.out, NuCypherExtUtilClient.getSmallBufferSize(conf))) ); success = true; return result; } finally { if (!success) { IOUtils.closeSocket(sock); } } }
/** * Checks if an address is already trusted and then sends client SASL * negotiation if required. * * @param addr connection address * @param underlyingOut connection output stream * @param underlyingIn connection input stream * @param encryptionKeyFactory for creation of an encryption key * @param accessToken connection block access token * @param datanodeId ID of destination DataNode * @return new pair of streams, wrapped after SASL negotiation * @throws IOException for any error */ private IOStreamPair checkTrustAndSend(InetAddress addr, OutputStream underlyingOut, InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory, Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId) throws IOException { if (!trustedChannelResolver.isTrusted() && !trustedChannelResolver.isTrusted(addr)) { // The encryption key factory only returns a key if encryption is enabled. DataEncryptionKey encryptionKey = encryptionKeyFactory.newDataEncryptionKey(); return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken, datanodeId); } else { LOG.debug( "SASL client skipping handshake on trusted connection for addr = {}, " + "datanodeId = {}", addr, datanodeId); return null; } }
/** * Sends client SASL negotiation for specialized encrypted handshake. * * @param underlyingOut connection output stream * @param underlyingIn connection input stream * @param encryptionKey for an encrypted SASL handshake * @return new pair of streams, wrapped after SASL negotiation * @throws IOException for any error */ private IOStreamPair getEncryptedStreams(OutputStream underlyingOut, InputStream underlyingIn, DataEncryptionKey encryptionKey) throws IOException { Map<String, String> saslProps = createSaslPropertiesForEncryption( encryptionKey.encryptionAlgorithm); LOG.debug("Client using encryption algorithm {}", encryptionKey.encryptionAlgorithm); String userName = getUserNameFromEncryptionKey(encryptionKey); char[] password = encryptionKeyToPassword(encryptionKey.encryptionKey); CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName, password); return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps, callbackHandler); }
/** * Receives SASL negotiation for specialized encrypted handshake. * * @param peer connection peer * @param underlyingOut connection output stream * @param underlyingIn connection input stream * @return new pair of streams, wrapped after SASL negotiation * @throws IOException for any error */ private IOStreamPair getEncryptedStreams(Peer peer, OutputStream underlyingOut, InputStream underlyingIn) throws IOException { if (peer.hasSecureChannel() || dnConf.getTrustedChannelResolver().isTrusted(getPeerAddress(peer))) { return new IOStreamPair(underlyingIn, underlyingOut); } Map<String, String> saslProps = createSaslPropertiesForEncryption( dnConf.getEncryptionAlgorithm()); if (LOG.isDebugEnabled()) { LOG.debug("Server using encryption algorithm " + dnConf.getEncryptionAlgorithm()); } CallbackHandler callbackHandler = new SaslServerCallbackHandler( new PasswordFunction() { @Override public char[] apply(String userName) throws IOException { return encryptionKeyToPassword(getEncryptionKeyFromUserName(userName)); } }); return doSaslHandshake(underlyingOut, underlyingIn, saslProps, callbackHandler); }
/** * Receives SASL negotiation for general-purpose handshake. * * @param peer connection peer * @param underlyingOut connection output stream * @param underlyingIn connection input stream * @return new pair of streams, wrapped after SASL negotiation * @throws IOException for any error */ private IOStreamPair getSaslStreams(Peer peer, OutputStream underlyingOut, InputStream underlyingIn) throws IOException { if (peer.hasSecureChannel() || dnConf.getTrustedChannelResolver().isTrusted(getPeerAddress(peer))) { return new IOStreamPair(underlyingIn, underlyingOut); } SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver(); Map<String, String> saslProps = saslPropsResolver.getServerProperties( getPeerAddress(peer)); CallbackHandler callbackHandler = new SaslServerCallbackHandler( new PasswordFunction() { @Override public char[] apply(String userName) throws IOException { return buildServerPassword(userName); } }); return doSaslHandshake(underlyingOut, underlyingIn, saslProps, callbackHandler); }
/** * Create IOStreamPair of {@link org.apache.hadoop.crypto.CryptoInputStream} * and {@link org.apache.hadoop.crypto.CryptoOutputStream} * * @param conf the configuration * @param cipherOption negotiated cipher option * @param out underlying output stream * @param in underlying input stream * @param isServer is server side * @return IOStreamPair the stream pair * @throws IOException for any error */ public static IOStreamPair createStreamPair(Configuration conf, CipherOption cipherOption, OutputStream out, InputStream in, boolean isServer) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Creating IOStreamPair of CryptoInputStream and " + "CryptoOutputStream."); } CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite()); byte[] inKey = cipherOption.getInKey(); byte[] inIv = cipherOption.getInIv(); byte[] outKey = cipherOption.getOutKey(); byte[] outIv = cipherOption.getOutIv(); InputStream cIn = new CryptoInputStream(in, codec, isServer ? inKey : outKey, isServer ? inIv : outIv); OutputStream cOut = new CryptoOutputStream(out, codec, isServer ? outKey : inKey, isServer ? outIv : inIv); return new IOStreamPair(cIn, cOut); }
/** * Connect to the given datanode's datantrasfer port, and return * the resulting IOStreamPair. This includes encryption wrapping, etc. */ private IOStreamPair connectToDN(DatanodeInfo dn, int timeout, LocatedBlock lb) throws IOException { boolean success = false; Socket sock = null; try { sock = socketFactory.createSocket(); String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname); if (LOG.isDebugEnabled()) { LOG.debug("Connecting to datanode " + dnAddr); } NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout); sock.setSoTimeout(timeout); OutputStream unbufOut = NetUtils.getOutputStream(sock); InputStream unbufIn = NetUtils.getInputStream(sock); IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this, lb.getBlockToken(), dn); success = true; return ret; } finally { if (!success) { IOUtils.closeSocket(sock); } } }
/** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * * @param lb the located block * @param dn the connected datanode * @return the inferred checksum type * @throws IOException if an error occurs */ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) throws IOException { IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb); try { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, HdfsConstants.SMALL_BUFFER_SIZE)); DataInputStream in = new DataInputStream(pair.in); new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true, CachingStrategy.newDefaultStrategy()); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn; DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtils.cleanup(null, pair.in, pair.out); } }
/** * Create IOStreamPair of {@link org.apache.hadoop.crypto.CryptoInputStream} * and {@link org.apache.hadoop.crypto.CryptoOutputStream} * * @param conf the configuration * @param cipherOption negotiated cipher option * @param out underlying output stream * @param in underlying input stream * @param isServer is server side * @return IOStreamPair the stream pair * @throws IOException for any error */ public static IOStreamPair createStreamPair(Configuration conf, CipherOption cipherOption, OutputStream out, InputStream in, boolean isServer) throws IOException { LOG.debug("Creating IOStreamPair of CryptoInputStream and " + "CryptoOutputStream."); CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite()); byte[] inKey = cipherOption.getInKey(); byte[] inIv = cipherOption.getInIv(); byte[] outKey = cipherOption.getOutKey(); byte[] outIv = cipherOption.getOutIv(); InputStream cIn = new CryptoInputStream(in, codec, isServer ? inKey : outKey, isServer ? inIv : outIv); OutputStream cOut = new CryptoOutputStream(out, codec, isServer ? outKey : inKey, isServer ? outIv : inIv); return new IOStreamPair(cIn, cOut); }
/** * Connect to the given datanode's datantrasfer port, and return * the resulting IOStreamPair. This includes encryption wrapping, etc. */ private IOStreamPair connectToDN(DatanodeInfo dn, int timeout, LocatedBlock lb) throws IOException { boolean success = false; Socket sock = null; try { sock = socketFactory.createSocket(); String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname()); LOG.debug("Connecting to datanode {}", dnAddr); NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout); sock.setSoTimeout(timeout); OutputStream unbufOut = NetUtils.getOutputStream(sock); InputStream unbufIn = NetUtils.getInputStream(sock); IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this, lb.getBlockToken(), dn); success = true; return ret; } finally { if (!success) { IOUtils.closeSocket(sock); } } }
/** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility * with older HDFS versions which did not include the checksum type in * OpBlockChecksumResponseProto. * * @param lb the located block * @param dn the connected datanode * @return the inferred checksum type * @throws IOException if an error occurs */ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) throws IOException { IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb); try { DataOutputStream out = new DataOutputStream( new BufferedOutputStream(pair.out, smallBufferSize)); DataInputStream in = new DataInputStream(pair.in); new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true, CachingStrategy.newDefaultStrategy()); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn; DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); return PBHelperClient.convert( reply.getReadOpChecksumInfo().getChecksum().getType()); } finally { IOUtilsClient.cleanup(null, pair.in, pair.out); } }
protected RemoteBlockReader2(String file, String bpid, long blockId, ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock, IOStreamPair ioStreams) { // Path is used only for printing block and file information in debug this.dnSock = dnSock; this.ioStreams = ioStreams; this.in = in; this.checksum = checksum; this.verifyChecksum = verifyChecksum; this.startOffset = Math.max(startOffset, 0); this.filename = file; // The total number of bytes that we need to transfer from the DN is // the amount that the user wants (bytesToRead), plus the padding at // the beginning in order to chunk-align. Note that the DN may elect // to send more than this amount if the read starts/ends mid-chunk. this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); bytesPerChecksum = this.checksum.getBytesPerChecksum(); checksumSize = this.checksum.getChecksumSize(); }
/** * Give an unused socket to the cache. * @param sock socket not used by anyone. */ public synchronized void put(Socket sock, IOStreamPair ioStreams) { Preconditions.checkNotNull(sock); SocketAndStreams s = new SocketAndStreams(sock, ioStreams); if (capacity <= 0) { // Cache disabled. s.close(); return; } startExpiryDaemon(); SocketAddress remoteAddr = sock.getRemoteSocketAddress(); if (remoteAddr == null) { LOG.warn("Cannot cache (unconnected) socket with no remote address: " + sock); IOUtils.closeSocket(sock); return; } if (capacity == multimap.size()) { evictOldest(); } multimap.put(remoteAddr, s); }
/** * Return some input/output streams that may henceforth have their * communication encrypted, depending on the negotiated quality of protection. * * @param out output stream to wrap * @param in input stream to wrap * @return IOStreamPair wrapping the streams */ public IOStreamPair createStreamPair(DataOutputStream out, DataInputStream in) { if (saslClient != null) { return new IOStreamPair( new SaslInputStream(in, saslClient), new SaslOutputStream(out, saslClient)); } else { return new IOStreamPair( new SaslInputStream(in, saslServer), new SaslOutputStream(out, saslServer)); } }
/** * Sends client SASL negotiation for general-purpose handshake. * * @param addr connection address * @param underlyingOut connection output stream * @param underlyingIn connection input stream * @param accessToken connection block access token * @param datanodeId ID of destination DataNode * @return new pair of streams, wrapped after SASL negotiation * @throws IOException for any error */ private IOStreamPair getSaslStreams(InetAddress addr, OutputStream underlyingOut, InputStream underlyingIn, Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId) throws IOException { Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr); String userName = buildUserName(accessToken); char[] password = buildClientPassword(accessToken); CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName, password); return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps, callbackHandler); }
public EncryptedPeer(Peer enclosedPeer, IOStreamPair ios) { this.enclosedPeer = enclosedPeer; this.in = ios.in; this.out = ios.out; this.channel = ios.in instanceof ReadableByteChannel ? (ReadableByteChannel)ios.in : null; }