private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, final Token<BlockTokenIdentifier> blockToken) throws IOException { //transfer replica to the new datanode Socket sock = null; DataOutputStream out = null; DataInputStream in = null; try { sock = createSocketForPipeline(src, 2, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(sock); if (dfsClient.shouldEncryptData()) { IOStreamPair encryptedStreams = DataTransferEncryptor.getEncryptedStreams( unbufOut, unbufIn, dfsClient.getDataEncryptionKey()); unbufOut = encryptedStreams.out; unbufIn = encryptedStreams.in; } out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); in = new DataInputStream(unbufIn); //send the TRANSFER_BLOCK request new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, targets); out.flush(); //ack BlockOpResponseProto response = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (SUCCESS != response.getStatus()) { throw new IOException("Failed to add a datanode"); } } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); IOUtils.closeSocket(sock); } }
public EncryptedPeer(Peer enclosedPeer, DataEncryptionKey key) throws IOException { this.enclosedPeer = enclosedPeer; IOStreamPair ios = DataTransferEncryptor.getEncryptedStreams( enclosedPeer.getOutputStream(), enclosedPeer.getInputStream(), key); this.in = ios.in; this.out = ios.out; this.channel = ios.in instanceof ReadableByteChannel ? (ReadableByteChannel)ios.in : null; }
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, final Token<BlockTokenIdentifier> blockToken) throws IOException { //transfer replica to the new datanode Socket sock = null; DataOutputStream out = null; DataInputStream in = null; try { sock = createSocketForPipeline(src, 2, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(sock); if (dfsClient.shouldEncryptData()) { IOStreamPair encryptedStreams = DataTransferEncryptor .getEncryptedStreams(unbufOut, unbufIn, dfsClient.getDataEncryptionKey()); unbufOut = encryptedStreams.out; unbufIn = encryptedStreams.in; } out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); in = new DataInputStream(unbufIn); //send the TRANSFER_BLOCK request new Sender(out) .transferBlock(block, blockToken, dfsClient.clientName, targets); out.flush(); //ack BlockOpResponseProto response = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (SUCCESS != response.getStatus()) { throw new IOException("Failed to add a datanode"); } } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); IOUtils.closeSocket(sock); } }
private void dispatch() { Socket sock = new Socket(); DataOutputStream out = null; DataInputStream in = null; try { sock.connect( NetUtils.createSocketAddr(target.datanode.getXferAddr()), HdfsServerConstants.READ_TIMEOUT); sock.setKeepAlive(true); OutputStream unbufOut = sock.getOutputStream(); InputStream unbufIn = sock.getInputStream(); if (nnc.getDataEncryptionKey() != null) { IOStreamPair encryptedStreams = DataTransferEncryptor.getEncryptedStreams( unbufOut, unbufIn, nnc.getDataEncryptionKey()); unbufOut = encryptedStreams.out; unbufIn = encryptedStreams.in; } out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.IO_FILE_BUFFER_SIZE)); in = new DataInputStream(new BufferedInputStream(unbufIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); sendRequest(out); receiveResponse(in); bytesMoved.inc(block.getNumBytes()); LOG.info( "Moving block " + block.getBlock().getBlockId() + " from "+ source.getDisplayName() + " to " + target.getDisplayName() + " through " + proxySource.getDisplayName() + " is succeeded." ); } catch (IOException e) { LOG.warn("Error moving block "+block.getBlockId()+ " from " + source.getDisplayName() + " to " + target.getDisplayName() + " through " + proxySource.getDisplayName() + ": "+e.getMessage()); } finally { IOUtils.closeStream(out); IOUtils.closeStream(in); IOUtils.closeSocket(sock); proxySource.removePendingBlock(this); target.removePendingBlock(this); synchronized (this ) { reset(); } synchronized (Balancer.this) { Balancer.this.notifyAll(); } } }
private void dispatch() { Socket sock = new Socket(); DataOutputStream out = null; DataInputStream in = null; try { sock.connect(NetUtils.createSocketAddr(target.datanode.getXferAddr()), HdfsServerConstants.READ_TIMEOUT); sock.setKeepAlive(true); OutputStream unbufOut = sock.getOutputStream(); InputStream unbufIn = sock.getInputStream(); if (nnc.getDataEncryptionKey() != null) { IOStreamPair encryptedStreams = DataTransferEncryptor .getEncryptedStreams(unbufOut, unbufIn, nnc.getDataEncryptionKey()); unbufOut = encryptedStreams.out; unbufIn = encryptedStreams.in; } out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.IO_FILE_BUFFER_SIZE)); in = new DataInputStream(new BufferedInputStream(unbufIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); sendRequest(out); receiveResponse(in); bytesMoved.inc(block.getNumBytes()); LOG.info("Moving block " + block.getBlock().getBlockId() + " from " + source.getDisplayName() + " to " + target.getDisplayName() + " through " + proxySource.getDisplayName() + " is succeeded."); } catch (IOException e) { LOG.warn("Error moving block " + block.getBlockId() + " from " + source.getDisplayName() + " to " + target.getDisplayName() + " through " + proxySource.getDisplayName() + ": " + e.getMessage()); } finally { IOUtils.closeStream(out); IOUtils.closeStream(in); IOUtils.closeSocket(sock); proxySource.removePendingBlock(this); target.removePendingBlock(this); synchronized (this) { reset(); } synchronized (Balancer.this) { Balancer.this.notifyAll(); } } }
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, final Token<BlockTokenIdentifier> blockToken) throws IOException { //transfer replica to the new datanode Socket sock = null; DataOutputStream out = null; DataInputStream in = null; try { sock = createSocketForPipeline(src, 2, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(sock); if (dfsClient.shouldEncryptData() && !dfsClient.trustedChannelResolver.isTrusted(sock.getInetAddress())) { IOStreamPair encryptedStreams = DataTransferEncryptor.getEncryptedStreams( unbufOut, unbufIn, dfsClient.getDataEncryptionKey()); unbufOut = encryptedStreams.out; unbufIn = encryptedStreams.in; } out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); in = new DataInputStream(unbufIn); //send the TRANSFER_BLOCK request new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, targets); out.flush(); //ack BlockOpResponseProto response = BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (SUCCESS != response.getStatus()) { throw new IOException("Failed to add a datanode"); } } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); IOUtils.closeSocket(sock); } }
private void dispatch() { Socket sock = new Socket(); DataOutputStream out = null; DataInputStream in = null; try { sock.connect( NetUtils.createSocketAddr(target.datanode.getXferAddr()), HdfsServerConstants.READ_TIMEOUT); /* Unfortunately we don't have a good way to know if the Datanode is * taking a really long time to move a block, OR something has * gone wrong and it's never going to finish. To deal with this * scenario, we set a long timeout (20 minutes) to avoid hanging * the balancer indefinitely. */ sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT); sock.setKeepAlive(true); OutputStream unbufOut = sock.getOutputStream(); InputStream unbufIn = sock.getInputStream(); if (nnc.getDataEncryptionKey() != null) { IOStreamPair encryptedStreams = DataTransferEncryptor.getEncryptedStreams( unbufOut, unbufIn, nnc.getDataEncryptionKey()); unbufOut = encryptedStreams.out; unbufIn = encryptedStreams.in; } out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.IO_FILE_BUFFER_SIZE)); in = new DataInputStream(new BufferedInputStream(unbufIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); sendRequest(out); receiveResponse(in); bytesMoved.inc(block.getNumBytes()); LOG.info("Successfully moved " + this); } catch (IOException e) { LOG.warn("Failed to move " + this + ": " + e.getMessage()); /* proxy or target may have an issue, insert a small delay * before using these nodes further. This avoids a potential storm * of "threads quota exceeded" Warnings when the balancer * gets out of sync with work going on in datanode. */ proxySource.activateDelay(DELAY_AFTER_ERROR); target.activateDelay(DELAY_AFTER_ERROR); } finally { IOUtils.closeStream(out); IOUtils.closeStream(in); IOUtils.closeSocket(sock); proxySource.removePendingBlock(this); target.removePendingBlock(this); synchronized (this ) { reset(); } synchronized (Balancer.this) { Balancer.this.notifyAll(); } } }