@Override public void transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes) throws IOException { checkAccess(socketOut, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); previousOpClientName = clientName; updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); final DataOutputStream out = new DataOutputStream( getOutputStream()); try { datanode.transferReplicaForPipelineRecovery(blk, targets, targetStorageTypes, clientName); writeResponse(Status.SUCCESS, null, out); } catch (IOException ioe) { LOG.info("transferBlock " + blk + " received exception " + ioe); incrDatanodeNetworkErrors(); throw ioe; } finally { IOUtils.closeStream(out); } }
@Override public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token<BlockTokenIdentifier> token) throws IOException { checkBlockLocalPathAccess(); checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ); Preconditions.checkNotNull(data, "Storage not yet initialized"); BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); if (LOG.isDebugEnabled()) { if (info != null) { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo successful block=" + block + " blockfile " + info.getBlockPath() + " metafile " + info.getMetaPath()); } } else { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo for block=" + block + " returning null"); } } } metrics.incrBlocksGetLocalPathInfo(); return info; }
@Override public HdfsBlocksMetadata getHdfsBlocksMetadata( String bpId, long[] blockIds, List<Token<BlockTokenIdentifier>> tokens) throws IOException, UnsupportedOperationException { if (!getHdfsBlockLocationsEnabled) { throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata " + " is not enabled in datanode config"); } if (blockIds.length != tokens.size()) { throw new IOException("Differing number of blocks and tokens"); } // Check access for each block for (int i = 0; i < blockIds.length; i++) { checkBlockToken(new ExtendedBlock(bpId, blockIds[i]), tokens.get(i), BlockTokenSecretManager.AccessMode.READ); } DataNodeFaultInjector.get().getHdfsBlocksMetadata(); return data.getHdfsBlocksMetadata(bpId, blockIds); }
private void checkReadAccess(final ExtendedBlock block) throws IOException { if (isBlockTokenEnabled) { Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser() .getTokenIdentifiers(); if (tokenIds.size() != 1) { throw new IOException("Can't continue since none or more than one " + "BlockTokenIdentifier is found."); } for (TokenIdentifier tokenId : tokenIds) { BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId; if (LOG.isDebugEnabled()) { LOG.debug("Got: " + id.toString()); } blockPoolTokenSecretManager.checkAccess(id, null, block, BlockTokenSecretManager.AccessMode.READ); } } }
@Test public void ensureSerialNumbersNeverOverlap() { BlockTokenSecretManager btsm1 = cluster.getNamesystem(0).getBlockManager() .getBlockTokenSecretManager(); BlockTokenSecretManager btsm2 = cluster.getNamesystem(1).getBlockManager() .getBlockTokenSecretManager(); BlockTokenSecretManager btsm3 = cluster.getNamesystem(2).getBlockManager() .getBlockTokenSecretManager(); setAndCheckSerialNumber(0, btsm1, btsm2, btsm3); setAndCheckSerialNumber(Integer.MAX_VALUE, btsm1, btsm2, btsm3); setAndCheckSerialNumber(Integer.MIN_VALUE, btsm1, btsm2, btsm3); setAndCheckSerialNumber(Integer.MAX_VALUE / 2, btsm1, btsm2, btsm3); setAndCheckSerialNumber(Integer.MIN_VALUE / 2, btsm1, btsm2, btsm3); setAndCheckSerialNumber(Integer.MAX_VALUE / 3, btsm1, btsm2, btsm3); setAndCheckSerialNumber(Integer.MIN_VALUE / 3, btsm1, btsm2, btsm3); }
private void setAndCheckSerialNumber(int serialNumber, BlockTokenSecretManager... btsms) { for (BlockTokenSecretManager btsm : btsms) { btsm.setSerialNo(serialNumber); } for (int i = 0; i < btsms.length; i++) { for (int j = 0; j < btsms.length; j++) { if (j == i) { continue; } int first = btsms[i].getSerialNoForTesting(); int second = btsms[j].getSerialNoForTesting(); assertFalse("Overlap found for set serial number (" + serialNumber + ") is " + i + ": " + first + " == " + j + ": " + second, first == second); } } }
@Override public void transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets) throws IOException { checkAccess(socketOut, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); previousOpClientName = clientName; updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); final DataOutputStream out = new DataOutputStream( getOutputStream()); try { datanode.transferReplicaForPipelineRecovery(blk, targets, clientName); writeResponse(Status.SUCCESS, null, out); } finally { IOUtils.closeStream(out); } }
@Override public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token<BlockTokenIdentifier> token) throws IOException { checkBlockLocalPathAccess(); checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ); BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); if (LOG.isDebugEnabled()) { if (info != null) { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo successful block=" + block + " blockfile " + info.getBlockPath() + " metafile " + info.getMetaPath()); } } else { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo for block=" + block + " returning null"); } } } metrics.incrBlocksGetLocalPathInfo(); return info; }
@Override public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks, List<Token<BlockTokenIdentifier>> tokens) throws IOException, UnsupportedOperationException { if (!getHdfsBlockLocationsEnabled) { throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata " + " is not enabled in datanode config"); } if (blocks.size() != tokens.size()) { throw new IOException("Differing number of blocks and tokens"); } // Check access for each block for (int i = 0; i < blocks.size(); i++) { checkBlockToken(blocks.get(i), tokens.get(i), BlockTokenSecretManager.AccessMode.READ); } return data.getHdfsBlocksMetadata(blocks); }
private void checkWriteAccess(final ExtendedBlock block) throws IOException { if (isBlockTokenEnabled) { Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser() .getTokenIdentifiers(); if (tokenIds.size() != 1) { throw new IOException("Can't continue since none or more than one " + "BlockTokenIdentifier is found."); } for (TokenIdentifier tokenId : tokenIds) { BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId; if (LOG.isDebugEnabled()) { LOG.debug("Got: " + id.toString()); } blockPoolTokenSecretManager.checkAccess(id, null, block, BlockTokenSecretManager.AccessMode.READ); } } }
private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long newGS, String description, Boolean eofExcepted) throws IOException { sendBuf.reset(); recvBuf.reset(); sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], null, stage, 0, block.getNumBytes(), block.getNumBytes(), newGS, DEFAULT_CHECKSUM); if (eofExcepted) { sendResponse(Status.ERROR, null, null, recvOut); sendRecvData(description, true); } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { //ok finally write a block with 0 len sendResponse(Status.SUCCESS, "", null, recvOut); sendRecvData(description, false); } else { writeZeroLengthPacket(block, description); } }
private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr( destination.getXferAddr()), HdfsServerConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, source.getStorageID(), sourceProxy); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply); return proto.getStatus() == Status.SUCCESS; }
@Override public void transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes) throws IOException { checkAccess(socketOut, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); previousOpClientName = clientName; updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); final DataOutputStream out = new DataOutputStream( getOutputStream()); try { datanode.transferReplicaForPipelineRecovery(blk, targets, targetStorageTypes, clientName); writeResponse(Status.SUCCESS, null, out); } finally { IOUtils.closeStream(out); } }
private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr( destination.getXferAddr()), HdfsServerConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); new Sender(out).replaceBlock(block, StorageType.DEFAULT, BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(), sourceProxy); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply); while (proto.getStatus() == Status.IN_PROGRESS) { proto = BlockOpResponseProto.parseDelimitedFrom(reply); } return proto.getStatus() == Status.SUCCESS; }
@Override public void transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets) throws IOException { checkAccess(null, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); previousOpClientName = clientName; updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); final DataOutputStream out = new DataOutputStream(getOutputStream()); try { datanode.transferReplicaForPipelineRecovery(blk, targets, clientName); writeResponse(Status.SUCCESS, null, out); } finally { IOUtils.closeStream(out); } }
@Override public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token<BlockTokenIdentifier> token) throws IOException { checkBlockLocalPathAccess(); checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ); BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); if (LOG.isDebugEnabled()) { if (info != null) { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo successful block=" + block + " blockfile " + info.getBlockPath() + " metafile " + info.getMetaPath()); } } else { if (LOG.isTraceEnabled()) { LOG.trace( "getBlockLocalPathInfo for block=" + block + " returning null"); } } } metrics.incrBlocksGetLocalPathInfo(); return info; }
@Override public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks, List<Token<BlockTokenIdentifier>> tokens) throws IOException, UnsupportedOperationException { if (!getHdfsBlockLocationsEnabled) { throw new UnsupportedOperationException( "Datanode#getHdfsBlocksMetadata " + " is not enabled in datanode config"); } if (blocks.size() != tokens.size()) { throw new IOException("Differing number of blocks and tokens"); } // Check access for each block for (int i = 0; i < blocks.size(); i++) { checkBlockToken(blocks.get(i), tokens.get(i), BlockTokenSecretManager.AccessMode.READ); } return data.getHdfsBlocksMetadata(blocks); }
private void checkWriteAccess(final ExtendedBlock block) throws IOException { if (isBlockTokenEnabled) { Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser().getTokenIdentifiers(); if (tokenIds.size() != 1) { throw new IOException("Can't continue since none or more than one " + "BlockTokenIdentifier is found."); } for (TokenIdentifier tokenId : tokenIds) { BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId; if (LOG.isDebugEnabled()) { LOG.debug("Got: " + id.toString()); } blockPoolTokenSecretManager.checkAccess(id, null, block, BlockTokenSecretManager.AccessMode.READ); } } }
private boolean replaceBlock(ExtendedBlock block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()), HdfsServerConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, source.getStorageID(), sourceProxy); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply); return proto.getStatus() == Status.SUCCESS; }
private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long newGS, String description, Boolean eofExcepted) throws IOException { sendBuf.reset(); recvBuf.reset(); sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], null, stage, 0, block.getNumBytes(), block.getNumBytes(), newGS, DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy()); if (eofExcepted) { sendResponse(Status.ERROR, null, null, recvOut); sendRecvData(description, true); } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { //ok finally write a block with 0 len sendResponse(Status.SUCCESS, "", null, recvOut); sendRecvData(description, false); } else { writeZeroLengthPacket(block, description); } }
private boolean replaceBlock( Block block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr( destination.getName()), HdfsConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK); out.writeLong(block.getBlockId()); out.writeLong(block.getGenerationStamp()); Text.writeString(out, source.getStorageID()); sourceProxy.write(out); BlockTokenSecretManager.DUMMY_TOKEN.write(out); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); short status = reply.readShort(); if(status == DataTransferProtocol.OP_STATUS_SUCCESS) { return true; } return false; }
private void sendRequest(DataOutputStream out) throws IOException { out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK); out.writeLong(block.getBlock().getBlockId()); out.writeLong(block.getBlock().getGenerationStamp()); Text.writeString(out, source.getStorageID()); proxySource.write(out); Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN; if (isBlockTokenEnabled) { accessToken = blockTokenSecretManager.generateToken(null, block.getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE, BlockTokenSecretManager.AccessMode.COPY)); } accessToken.write(out); out.flush(); }
@Override public BlockLocalPathInfo getBlockLocalPathInfo(Block block, Token<BlockTokenIdentifier> token) throws IOException { checkBlockLocalPathAccess(); checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ); BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); if (LOG.isDebugEnabled()) { if (info != null) { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo successful block=" + block + " blockfile " + info.getBlockPath() + " metafile " + info.getMetaPath()); } } else { if (LOG.isTraceEnabled()) { LOG.trace("getBlockLocalPathInfo for block=" + block + " returning null"); } } } myMetrics.incrBlocksGetLocalPathInfo(); return info; }