public static Peer peerFromSocketAndKey( SaslDataTransferClient saslClient, Socket s, DataEncryptionKeyFactory keyFactory, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; boolean success = false; try { peer = peerFromSocket(s); peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId); success = true; return peer; } finally { if (!success) { IOUtilsClient.cleanup(null, peer); } } }
protected RemoteBlockReader2(String file, String bpid, long blockId, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, DatanodeID datanodeID, PeerCache peerCache) { this.isLocal = DFSClient.isLocalAddress(NetUtils. createSocketAddr(datanodeID.getXferAddr())); // Path is used only for printing block and file information in debug this.peer = peer; this.datanodeID = datanodeID; this.in = peer.getInputStreamChannel(); this.checksum = checksum; this.verifyChecksum = verifyChecksum; this.startOffset = Math.max( startOffset, 0 ); this.filename = file; this.peerCache = peerCache; this.blockId = blockId; // The total number of bytes that we need to transfer from the DN is // the amount that the user wants (bytesToRead), plus the padding at // the beginning in order to chunk-align. Note that the DN may elect // to send more than this amount if the read starts/ends mid-chunk. this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); bytesPerChecksum = this.checksum.getBytesPerChecksum(); checksumSize = this.checksum.getChecksumSize(); }
/** * Receives SASL negotiation for specialized encrypted handshake. * * @param peer connection peer * @param underlyingOut connection output stream * @param underlyingIn connection input stream * @return new pair of streams, wrapped after SASL negotiation * @throws IOException for any error */ private IOStreamPair getEncryptedStreams(Peer peer, OutputStream underlyingOut, InputStream underlyingIn) throws IOException { if (peer.hasSecureChannel() || dnConf.getTrustedChannelResolver().isTrusted(getPeerAddress(peer))) { return new IOStreamPair(underlyingIn, underlyingOut); } Map<String, String> saslProps = createSaslPropertiesForEncryption( dnConf.getEncryptionAlgorithm()); if (LOG.isDebugEnabled()) { LOG.debug("Server using encryption algorithm " + dnConf.getEncryptionAlgorithm()); } CallbackHandler callbackHandler = new SaslServerCallbackHandler( new PasswordFunction() { @Override public char[] apply(String userName) throws IOException { return encryptionKeyToPassword(getEncryptionKeyFromUserName(userName)); } }); return doSaslHandshake(underlyingOut, underlyingIn, saslProps, callbackHandler); }
/** * Receives SASL negotiation for general-purpose handshake. * * @param peer connection peer * @param underlyingOut connection output stream * @param underlyingIn connection input stream * @return new pair of streams, wrapped after SASL negotiation * @throws IOException for any error */ private IOStreamPair getSaslStreams(Peer peer, OutputStream underlyingOut, InputStream underlyingIn) throws IOException { if (peer.hasSecureChannel() || dnConf.getTrustedChannelResolver().isTrusted(getPeerAddress(peer))) { return new IOStreamPair(underlyingIn, underlyingOut); } SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver(); Map<String, String> saslProps = saslPropsResolver.getServerProperties( getPeerAddress(peer)); CallbackHandler callbackHandler = new SaslServerCallbackHandler( new PasswordFunction() { @Override public char[] apply(String userName) throws IOException { return buildServerPassword(userName); } }); return doSaslHandshake(underlyingOut, underlyingIn, saslProps, callbackHandler); }
/** * Get the next DomainPeer-- either from the cache or by creating it. * * @return the next DomainPeer, or null if we could not construct one. */ private BlockReaderPeer nextDomainPeer() { if (remainingCacheTries > 0) { Peer peer = clientContext.getPeerCache().get(datanode, true); if (peer != null) { if (LOG.isTraceEnabled()) { LOG.trace("nextDomainPeer: reusing existing peer " + peer); } return new BlockReaderPeer(peer, true); } } DomainSocket sock = clientContext.getDomainSocketFactory(). createSocket(pathInfo, conf.socketTimeout); if (sock == null) return null; return new BlockReaderPeer(new DomainPeer(sock), false); }
@Override // RemotePeerFactory public Peer newConnectedPeer(InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; boolean success = false; Socket sock = null; try { sock = socketFactory.createSocket(); NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), dfsClientConf.socketTimeout); peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this, blockToken, datanodeId); peer.setReadTimeout(dfsClientConf.socketTimeout); success = true; return peer; } finally { if (!success) { IOUtils.cleanup(LOG, peer); IOUtils.closeSocket(sock); } } }
private synchronized Peer getInternal(DatanodeID dnId, boolean isDomain) { List<Value> sockStreamList = multimap.get(new Key(dnId, isDomain)); if (sockStreamList == null) { return null; } Iterator<Value> iter = sockStreamList.iterator(); while (iter.hasNext()) { Value candidate = iter.next(); iter.remove(); long ageMs = Time.monotonicNow() - candidate.getTime(); Peer peer = candidate.getPeer(); if (ageMs >= expiryPeriod) { try { peer.close(); } catch (IOException e) { LOG.warn("got IOException closing stale peer " + peer + ", which is " + ageMs + " ms old"); } } else if (!peer.isClosed()) { return peer; } } return null; }
private DataXceiver(Peer peer, DataNode datanode, DataXceiverServer dataXceiverServer) throws IOException { this.peer = peer; this.dnConf = datanode.getDnConf(); this.socketIn = peer.getInputStream(); this.socketOut = peer.getOutputStream(); this.datanode = datanode; this.dataXceiverServer = dataXceiverServer; this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname; remoteAddress = peer.getRemoteAddressString(); final int colonIdx = remoteAddress.indexOf(':'); remoteAddressWithoutPort = (colonIdx < 0) ? remoteAddress : remoteAddress.substring(0, colonIdx); localAddress = peer.getLocalAddressString(); if (LOG.isDebugEnabled()) { LOG.debug("Number of active connections is: " + datanode.getXceiverCount()); } }
protected RemoteBlockReader2(String file, long blockId, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) { this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. createSocketAddr(datanodeID.getXferAddr())); // Path is used only for printing block and file information in debug this.peer = peer; this.datanodeID = datanodeID; this.in = peer.getInputStreamChannel(); this.checksum = checksum; this.verifyChecksum = verifyChecksum; this.startOffset = Math.max( startOffset, 0 ); this.filename = file; this.peerCache = peerCache; this.blockId = blockId; // The total number of bytes that we need to transfer from the DN is // the amount that the user wants (bytesToRead), plus the padding at // the beginning in order to chunk-align. Note that the DN may elect // to send more than this amount if the read starts/ends mid-chunk. this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); bytesPerChecksum = this.checksum.getBytesPerChecksum(); checksumSize = this.checksum.getChecksumSize(); this.tracer = tracer; }
@Override // RemotePeerFactory public Peer newConnectedPeer(InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; boolean success = false; Socket sock = null; final int socketTimeout = dfsClientConf.getSocketTimeout(); try { sock = socketFactory.createSocket(); NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), socketTimeout); peer = DFSUtilClient.peerFromSocketAndKey(saslClient, sock, this, blockToken, datanodeId); peer.setReadTimeout(socketTimeout); peer.setWriteTimeout(socketTimeout); success = true; return peer; } finally { if (!success) { IOUtilsClient.cleanup(LOG, peer); IOUtils.closeSocket(sock); } } }
private DataXceiver(Peer peer, DataNode datanode, DataXceiverServer dataXceiverServer) throws IOException { super(datanode.getTracer()); this.peer = peer; this.dnConf = datanode.getDnConf(); this.socketIn = peer.getInputStream(); this.socketOut = peer.getOutputStream(); this.datanode = datanode; this.dataXceiverServer = dataXceiverServer; this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname; this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(datanode.getConf()); this.smallBufferSize = DFSUtilClient.getSmallBufferSize(datanode.getConf()); remoteAddress = peer.getRemoteAddressString(); final int colonIdx = remoteAddress.indexOf(':'); remoteAddressWithoutPort = (colonIdx < 0) ? remoteAddress : remoteAddress.substring(0, colonIdx); localAddress = peer.getLocalAddressString(); if (LOG.isDebugEnabled()) { LOG.debug("Number of active connections is: " + datanode.getXceiverCount()); } }
private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; boolean success = false; Socket sock = null; final int socketTimeout = datanode.getDnConf().getSocketTimeout(); try { sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); NetUtils.connect(sock, addr, socketTimeout); peer = DFSUtilClient.peerFromSocketAndKey(datanode.getSaslClient(), sock, datanode.getDataEncryptionKeyFactoryForBlock(b), blockToken, datanodeId); peer.setReadTimeout(socketTimeout); success = true; return peer; } finally { if (!success) { IOUtils.cleanup(null, peer); IOUtils.closeSocket(sock); } } }
protected RemoteBlockReader2(String file, String bpid, long blockId, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, DatanodeID datanodeID, PeerCache peerCache) { this.isLocal = DFSClient.isLocalAddress(NetUtils. createSocketAddr(datanodeID.getXferAddr())); // Path is used only for printing block and file information in debug this.peer = peer; this.datanodeID = datanodeID; this.in = peer.getInputStreamChannel(); this.checksum = checksum; this.verifyChecksum = verifyChecksum; this.startOffset = Math.max( startOffset, 0 ); this.filename = file; this.peerCache = peerCache; // The total number of bytes that we need to transfer from the DN is // the amount that the user wants (bytesToRead), plus the padding at // the beginning in order to chunk-align. Note that the DN may elect // to send more than this amount if the read starts/ends mid-chunk. this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); bytesPerChecksum = this.checksum.getBytesPerChecksum(); checksumSize = this.checksum.getChecksumSize(); }
static void checkSuccess( BlockOpResponseProto status, Peer peer, ExtendedBlock block, String file) throws IOException { if (status.getStatus() != Status.SUCCESS) { if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException( "Got access token error for OP_READ_BLOCK, self=" + peer.getLocalAddressString() + ", remote=" + peer.getRemoteAddressString() + ", for file " + file + ", for pool " + block.getBlockPoolId() + " block " + block.getBlockId() + "_" + block.getGenerationStamp()); } else { throw new IOException("Got error for OP_READ_BLOCK, self=" + peer.getLocalAddressString() + ", remote=" + peer.getRemoteAddressString() + ", for file " + file + ", for pool " + block.getBlockPoolId() + " block " + block.getBlockId() + "_" + block.getGenerationStamp()); } } }
private Peer newTcpPeer(InetSocketAddress addr) throws IOException { Peer peer = null; boolean success = false; Socket sock = null; try { sock = dfsClient.socketFactory.createSocket(); NetUtils.connect(sock, addr, dfsClient.getRandomLocalInterfaceAddr(), dfsClient.getConf().socketTimeout); peer = TcpPeerServer.peerFromSocketAndKey(sock, dfsClient.getDataEncryptionKey()); success = true; return peer; } finally { if (!success) { IOUtils.closeQuietly(peer); IOUtils.closeQuietly(sock); } } }
/** * Get a cached peer connected to the given DataNode. * @param dnId The DataNode to get a Peer for. * @param isDomain Whether to retrieve a DomainPeer or not. * * @return An open Peer connected to the DN, or null if none * was found. */ public synchronized Peer get(DatanodeID dnId, boolean isDomain) { if (capacity <= 0) { // disabled return null; } List<Value> sockStreamList = multimap.get(new Key(dnId, isDomain)); if (sockStreamList == null) { return null; } Iterator<Value> iter = sockStreamList.iterator(); while (iter.hasNext()) { Value candidate = iter.next(); iter.remove(); if (!candidate.getPeer().isClosed()) { return candidate.getPeer(); } } return null; }
/** * Give an unused socket to the cache. * @param sock socket not used by anyone. */ public synchronized void put(DatanodeID dnId, Peer peer) { Preconditions.checkNotNull(dnId); Preconditions.checkNotNull(peer); if (peer.isClosed()) return; if (capacity <= 0) { // Cache disabled. IOUtils.cleanup(LOG, peer); return; } startExpiryDaemon(); if (capacity == multimap.size()) { evictOldest(); } multimap.put(new Key(dnId, peer.getDomainSocket() != null), new Value(peer, Time.monotonicNow())); }
private DataXceiver(Peer peer, DataNode datanode, DataXceiverServer dataXceiverServer) throws IOException { this.peer = peer; this.dnConf = datanode.getDnConf(); this.socketIn = peer.getInputStream(); this.socketOut = peer.getOutputStream(); this.datanode = datanode; this.dataXceiverServer = dataXceiverServer; this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname; remoteAddress = peer.getRemoteAddressString(); localAddress = peer.getLocalAddressString(); if (LOG.isDebugEnabled()) { LOG.debug("Number of active connections is: " + datanode.getXceiverCount()); } }