/** * Connect to the given datanode's datantrasfer port, and return * the resulting IOStreamPair. This includes encryption wrapping, etc. */ public static IOStreamPair connectToDN(DatanodeInfo dn, int timeout, Configuration conf, SaslDataTransferClient saslClient, SocketFactory socketFactory, boolean connectToDnViaHostname, DataEncryptionKeyFactory dekFactory, Token<BlockTokenIdentifier> blockToken) throws IOException { boolean success = false; Socket sock = null; try { sock = socketFactory.createSocket(); String dnAddr = dn.getXferAddr(connectToDnViaHostname); LOG.debug("Connecting to datanode {}", dnAddr); NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout); sock.setSoTimeout(timeout); OutputStream unbufOut = NetUtils.getOutputStream(sock); InputStream unbufIn = NetUtils.getInputStream(sock); IOStreamPair pair = saslClient.newSocketSend(sock, unbufOut, unbufIn, dekFactory, blockToken, dn); IOStreamPair result = new IOStreamPair( new DataInputStream(pair.in), new DataOutputStream(new BufferedOutputStream(pair.out, NuCypherExtUtilClient.getSmallBufferSize(conf))) ); success = true; return result; } finally { if (!success) { IOUtils.closeSocket(sock); } } }
public static Peer peerFromSocketAndKey( SaslDataTransferClient saslClient, Socket s, DataEncryptionKeyFactory keyFactory, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; boolean success = false; try { peer = peerFromSocket(s); peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId); success = true; return peer; } finally { if (!success) { IOUtilsClient.cleanup(null, peer); } } }
/** * Checks if an address is already trusted and then sends client SASL * negotiation if required. * * @param addr connection address * @param underlyingOut connection output stream * @param underlyingIn connection input stream * @param encryptionKeyFactory for creation of an encryption key * @param accessToken connection block access token * @param datanodeId ID of destination DataNode * @return new pair of streams, wrapped after SASL negotiation * @throws IOException for any error */ private IOStreamPair checkTrustAndSend(InetAddress addr, OutputStream underlyingOut, InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory, Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId) throws IOException { if (!trustedChannelResolver.isTrusted() && !trustedChannelResolver.isTrusted(addr)) { // The encryption key factory only returns a key if encryption is enabled. DataEncryptionKey encryptionKey = encryptionKeyFactory.newDataEncryptionKey(); return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken, datanodeId); } else { LOG.debug( "SASL client skipping handshake on trusted connection for addr = {}, " + "datanodeId = {}", addr, datanodeId); return null; } }
@Override public void readBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final long blockOffset, final long length, final boolean sendChecksum, final CachingStrategy cachingStrategy) throws IOException { OpReadBlockProto proto = OpReadBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken)) .setOffset(blockOffset) .setLen(length) .setSendChecksums(sendChecksum) .setCachingStrategy(getCachingStrategy(cachingStrategy)) .build(); send(out, Op.READ_BLOCK, proto); }
@Override public void transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes) throws IOException { OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildClientHeader( blk, clientName, blockToken)) .addAllTargets(PBHelper.convert(targets)) .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes)) .build(); send(out, Op.TRANSFER_BLOCK, proto); }
@Override public void requestShortCircuitFds(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, SlotId slotId, int maxVersion, boolean supportsReceiptVerification) throws IOException { OpRequestShortCircuitAccessProto.Builder builder = OpRequestShortCircuitAccessProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader( blk, blockToken)).setMaxVersion(maxVersion); if (slotId != null) { builder.setSlotId(PBHelper.convert(slotId)); } builder.setSupportsReceiptVerification(supportsReceiptVerification); OpRequestShortCircuitAccessProto proto = builder.build(); send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto); }
@Override // RemotePeerFactory public Peer newConnectedPeer(InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; boolean success = false; Socket sock = null; try { sock = socketFactory.createSocket(); NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), dfsClientConf.socketTimeout); peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this, blockToken, datanodeId); peer.setReadTimeout(dfsClientConf.socketTimeout); success = true; return peer; } finally { if (!success) { IOUtils.cleanup(LOG, peer); IOUtils.closeSocket(sock); } } }
@Override public void transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes) throws IOException { checkAccess(socketOut, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); previousOpClientName = clientName; updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); final DataOutputStream out = new DataOutputStream( getOutputStream()); try { datanode.transferReplicaForPipelineRecovery(blk, targets, targetStorageTypes, clientName); writeResponse(Status.SUCCESS, null, out); } catch (IOException ioe) { LOG.info("transferBlock " + blk + " received exception " + ioe); incrDatanodeNetworkErrors(); throw ioe; } finally { IOUtils.closeStream(out); } }
@Override public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token<BlockTokenIdentifier> token) throws IOException { checkBlockLocalPathAccess(); checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ); Preconditions.checkNotNull(data, "Storage not yet initialized"); BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); if (LOG.isDebugEnabled()) { if (info != null) { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo successful block=" + block + " blockfile " + info.getBlockPath() + " metafile " + info.getMetaPath()); } } else { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo for block=" + block + " returning null"); } } } metrics.incrBlocksGetLocalPathInfo(); return info; }
@Override public HdfsBlocksMetadata getHdfsBlocksMetadata( String bpId, long[] blockIds, List<Token<BlockTokenIdentifier>> tokens) throws IOException, UnsupportedOperationException { if (!getHdfsBlockLocationsEnabled) { throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata " + " is not enabled in datanode config"); } if (blockIds.length != tokens.size()) { throw new IOException("Differing number of blocks and tokens"); } // Check access for each block for (int i = 0; i < blockIds.length; i++) { checkBlockToken(new ExtendedBlock(bpId, blockIds[i]), tokens.get(i), BlockTokenSecretManager.AccessMode.READ); } DataNodeFaultInjector.get().getHdfsBlocksMetadata(); return data.getHdfsBlocksMetadata(bpId, blockIds); }
private void checkReadAccess(final ExtendedBlock block) throws IOException { if (isBlockTokenEnabled) { Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser() .getTokenIdentifiers(); if (tokenIds.size() != 1) { throw new IOException("Can't continue since none or more than one " + "BlockTokenIdentifier is found."); } for (TokenIdentifier tokenId : tokenIds) { BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId; if (LOG.isDebugEnabled()) { LOG.debug("Got: " + id.toString()); } blockPoolTokenSecretManager.checkAccess(id, null, block, BlockTokenSecretManager.AccessMode.READ); } } }
@Override public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token<BlockTokenIdentifier> token) throws IOException { GetBlockLocalPathInfoRequestProto req = GetBlockLocalPathInfoRequestProto.newBuilder() .setBlock(PBHelper.convert(block)) .setToken(PBHelper.convert(token)).build(); GetBlockLocalPathInfoResponseProto resp; try { resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } return new BlockLocalPathInfo(PBHelper.convert(resp.getBlock()), resp.getLocalPath(), resp.getLocalMetaPath()); }
public static Peer peerFromSocketAndKey( SaslDataTransferClient saslClient, Socket s, DataEncryptionKeyFactory keyFactory, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; boolean success = false; try { peer = peerFromSocket(s); peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId); success = true; return peer; } finally { if (!success) { IOUtils.cleanup(null, peer); } } }
/** For {@link TestTransferRbw} */ public static BlockOpResponseProto transferRbw(final ExtendedBlock b, final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException { assertEquals(2, datanodes.length); final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0], datanodes.length, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( NetUtils.getOutputStream(s, writeTimeout), HdfsConstants.SMALL_BUFFER_SIZE)); final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s)); // send the request new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(), dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}, new StorageType[]{StorageType.DEFAULT}); out.flush(); return BlockOpResponseProto.parseDelimitedFrom(in); }
private LocatedBlock createLocatedBlock() { DatanodeInfo[] dnInfos = { DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h1", AdminStates.DECOMMISSION_INPROGRESS), DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h2", AdminStates.DECOMMISSIONED), DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3", AdminStates.NORMAL), DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h4", AdminStates.NORMAL), }; String[] storageIDs = {"s1", "s2", "s3", "s4"}; StorageType[] media = { StorageType.DISK, StorageType.SSD, StorageType.DISK, StorageType.RAM_DISK }; LocatedBlock lb = new LocatedBlock( new ExtendedBlock("bp12", 12345, 10, 53), dnInfos, storageIDs, media, 5, false, new DatanodeInfo[]{}); lb.setBlockToken(new Token<BlockTokenIdentifier>( "identifier".getBytes(), "password".getBytes(), new Text("kind"), new Text("service"))); return lb; }
private LocatedBlock createLocatedBlockNoStorageMedia() { DatanodeInfo[] dnInfos = { DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h1", AdminStates.DECOMMISSION_INPROGRESS), DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h2", AdminStates.DECOMMISSIONED), DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3", AdminStates.NORMAL) }; LocatedBlock lb = new LocatedBlock( new ExtendedBlock("bp12", 12345, 10, 53), dnInfos, 5, false); lb.setBlockToken(new Token<BlockTokenIdentifier>( "identifier".getBytes(), "password".getBytes(), new Text("kind"), new Text("service"))); return lb; }
@Override public void transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes) throws IOException { OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildClientHeader( blk, clientName, blockToken)) .addAllTargets(PBHelperClient.convert(targets)) .addAllTargetStorageTypes( PBHelperClient.convertStorageTypes(targetStorageTypes)) .build(); send(out, Op.TRANSFER_BLOCK, proto); }
@Override public void requestShortCircuitFds(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, SlotId slotId, int maxVersion, boolean supportsReceiptVerification) throws IOException { OpRequestShortCircuitAccessProto.Builder builder = OpRequestShortCircuitAccessProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader( blk, blockToken)).setMaxVersion(maxVersion); if (slotId != null) { builder.setSlotId(PBHelperClient.convert(slotId)); } builder.setSupportsReceiptVerification(supportsReceiptVerification); OpRequestShortCircuitAccessProto proto = builder.build(); send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto); }
@Override // RemotePeerFactory public Peer newConnectedPeer(InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; boolean success = false; Socket sock = null; final int socketTimeout = dfsClientConf.getSocketTimeout(); try { sock = socketFactory.createSocket(); NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), socketTimeout); peer = DFSUtilClient.peerFromSocketAndKey(saslClient, sock, this, blockToken, datanodeId); peer.setReadTimeout(socketTimeout); peer.setWriteTimeout(socketTimeout); success = true; return peer; } finally { if (!success) { IOUtilsClient.cleanup(LOG, peer); IOUtils.closeSocket(sock); } } }
/** * This method creates an internal block at the given index of a block group * * @param idxInReturnedLocs The index in the stored locations in the * {@link LocatedStripedBlock} object * @param idxInBlockGroup The logical index in the striped block group * @return The constructed internal block */ public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg, int idxInReturnedLocs, int cellSize, int dataBlkNum, int idxInBlockGroup) { final ExtendedBlock blk = constructInternalBlock( bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup); final LocatedBlock locatedBlock; if (idxInReturnedLocs < bg.getLocations().length) { locatedBlock = new LocatedBlock(blk, new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]}, new String[]{bg.getStorageIDs()[idxInReturnedLocs]}, new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]}, bg.getStartOffset(), bg.isCorrupt(), null); } else { locatedBlock = new LocatedBlock(blk, null, null, null, bg.getStartOffset(), bg.isCorrupt(), null); } Token<BlockTokenIdentifier>[] blockTokens = bg.getBlockTokens(); if (idxInReturnedLocs < blockTokens.length) { locatedBlock.setBlockToken(blockTokens[idxInReturnedLocs]); } return locatedBlock; }
/** * Get a new generation stamp together with an access token for * a block under construction * * This method is called for recovering a failed write or setting up * a block for appended. * * @param block a block * @param clientName the name of a client * @return a located block with a new generation stamp and an access token * @throws IOException if any error occurs */ LocatedBlock bumpBlockGenerationStamp(ExtendedBlock block, String clientName) throws IOException { final LocatedBlock locatedBlock; checkOperation(OperationCategory.WRITE); writeLock(); try { checkOperation(OperationCategory.WRITE); // check vadility of parameters final INodeFile file = checkUCBlock(block, clientName); // get a new generation stamp and an access token block.setGenerationStamp(nextGenerationStamp(blockIdManager.isLegacyBlock(block.getLocalBlock()))); locatedBlock = BlockManager.newLocatedBlock( block, file.getLastBlock(), null, -1); blockManager.setBlockToken(locatedBlock, BlockTokenIdentifier.AccessMode.WRITE); } finally { writeUnlock(); } // Ensure we record the new generation stamp getEditLog().logSync(); return locatedBlock; }
/** Generate a block token for the located block. */ public void setBlockToken(final LocatedBlock b, final AccessMode mode) throws IOException { if (isBlockTokenEnabled()) { // Use cached UGI if serving RPC calls. if (b.isStriped()) { Preconditions.checkState(b instanceof LocatedStripedBlock); LocatedStripedBlock sb = (LocatedStripedBlock) b; byte[] indices = sb.getBlockIndices(); Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length]; ExtendedBlock internalBlock = new ExtendedBlock(b.getBlock()); for (int i = 0; i < indices.length; i++) { internalBlock.setBlockId(b.getBlock().getBlockId() + indices[i]); blockTokens[i] = blockTokenSecretManager.generateToken( NameNode.getRemoteUser().getShortUserName(), internalBlock, EnumSet.of(mode)); } sb.setBlockTokens(blockTokens); } else { b.setBlockToken(blockTokenSecretManager.generateToken( NameNode.getRemoteUser().getShortUserName(), b.getBlock(), EnumSet.of(mode))); } } }
@Override public void transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes) throws IOException { checkAccess(socketOut, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY); previousOpClientName = clientName; updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); final DataOutputStream out = new DataOutputStream( getOutputStream()); try { datanode.transferReplicaForPipelineRecovery(blk, targets, targetStorageTypes, clientName); writeResponse(Status.SUCCESS, null, out); } catch (IOException ioe) { LOG.info("transferBlock " + blk + " received exception " + ioe); incrDatanodeNetworkErrors(); throw ioe; } finally { IOUtils.closeStream(out); } }
@Override public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token<BlockTokenIdentifier> token) throws IOException { checkBlockLocalPathAccess(); checkBlockToken(block, token, BlockTokenIdentifier.AccessMode.READ); Preconditions.checkNotNull(data, "Storage not yet initialized"); BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); if (LOG.isDebugEnabled()) { if (info != null) { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo successful block=" + block + " blockfile " + info.getBlockPath() + " metafile " + info.getMetaPath()); } } else { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo for block=" + block + " returning null"); } } } metrics.incrBlocksGetLocalPathInfo(); return info; }
private void checkReadAccess(final ExtendedBlock block) throws IOException { if (isBlockTokenEnabled) { Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser() .getTokenIdentifiers(); if (tokenIds.size() != 1) { throw new IOException("Can't continue since none or more than one " + "BlockTokenIdentifier is found."); } for (TokenIdentifier tokenId : tokenIds) { BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId; if (LOG.isDebugEnabled()) { LOG.debug("Got: " + id.toString()); } blockPoolTokenSecretManager.checkAccess(id, null, block, BlockTokenIdentifier.AccessMode.READ); } } }
private BlockReader newBlockReader(final ExtendedBlock block, long offsetInBlock, DatanodeInfo dnInfo) { if (offsetInBlock >= block.getNumBytes()) { return null; } try { InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo); Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken( block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ)); /* * This can be further improved if the replica is local, then we can * read directly from DN and need to check the replica is FINALIZED * state, notice we should not use short-circuit local read which * requires config for domain-socket in UNIX or legacy config in Windows. * * TODO: add proper tracer */ return RemoteBlockReader2.newBlockReader( "dummy", block, blockToken, offsetInBlock, block.getNumBytes() - offsetInBlock, true, "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo, null, cachingStrategy, datanode.getTracer()); } catch (IOException e) { return null; } }
/** For {@link TestTransferRbw} */ public static BlockOpResponseProto transferRbw(final ExtendedBlock b, final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException { assertEquals(2, datanodes.length); final Socket s = DataStreamer.createSocketForPipeline(datanodes[0], datanodes.length, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( NetUtils.getOutputStream(s, writeTimeout), DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration()))); final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s)); // send the request new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(), dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}, new StorageType[]{StorageType.DEFAULT}); out.flush(); return BlockOpResponseProto.parseDelimitedFrom(in); }
private LocatedBlock createLocatedBlockNoStorageMedia() { DatanodeInfo[] dnInfos = { DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h1", AdminStates.DECOMMISSION_INPROGRESS), DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h2", AdminStates.DECOMMISSIONED), DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3", AdminStates.NORMAL) }; LocatedBlock lb = new LocatedBlock( new ExtendedBlock("bp12", 12345, 10, 53), dnInfos); lb.setBlockToken(new Token<BlockTokenIdentifier>( "identifier".getBytes(), "password".getBytes(), new Text("kind"), new Text("service"))); lb.setStartOffset(5); return lb; }