/** * Reads a SASL negotiation message and negotiation cipher options. * * @param in stream to read * @param cipherOptions list to store negotiation cipher options * @return byte[] SASL negotiation message * @throws IOException for any error */ public static byte[] readSaslMessageAndNegotiationCipherOptions( InputStream in, List<CipherOption> cipherOptions) throws IOException { DataTransferEncryptorMessageProto proto = DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { throw new InvalidEncryptionKeyException(proto.getMessage()); } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { throw new IOException(proto.getMessage()); } else { List<CipherOptionProto> optionProtos = proto.getCipherOptionList(); if (optionProtos != null) { for (CipherOptionProto optionProto : optionProtos) { cipherOptions.add(PBHelper.convert(optionProto)); } } return proto.getPayload().toByteArray(); } }
/** * Read SASL message and negotiated cipher option from server. * * @param in stream to read * @return SaslResponseWithNegotiatedCipherOption SASL message and * negotiated cipher option * @throws IOException for any error */ public static SaslResponseWithNegotiatedCipherOption readSaslMessageAndNegotiatedCipherOption(InputStream in) throws IOException { DataTransferEncryptorMessageProto proto = DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { throw new InvalidEncryptionKeyException(proto.getMessage()); } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { throw new IOException(proto.getMessage()); } else { byte[] response = proto.getPayload().toByteArray(); List<CipherOption> options = PBHelper.convertCipherOptionProtos( proto.getCipherOptionList()); CipherOption option = null; if (options != null && !options.isEmpty()) { option = options.get(0); } return new SaslResponseWithNegotiatedCipherOption(response, option); } }
/** * Reads a SASL negotiation message and negotiation cipher options. * * @param in stream to read * @param cipherOptions list to store negotiation cipher options * @return byte[] SASL negotiation message * @throws IOException for any error */ public static byte[] readSaslMessageAndNegotiationCipherOptions( InputStream in, List<CipherOption> cipherOptions) throws IOException { DataTransferEncryptorMessageProto proto = DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { throw new InvalidEncryptionKeyException(proto.getMessage()); } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { throw new IOException(proto.getMessage()); } else { List<CipherOptionProto> optionProtos = proto.getCipherOptionList(); if (optionProtos != null) { for (CipherOptionProto optionProto : optionProtos) { cipherOptions.add(PBHelperClient.convert(optionProto)); } } return proto.getPayload().toByteArray(); } }
/** * Read SASL message and negotiated cipher option from server. * * @param in stream to read * @return SaslResponseWithNegotiatedCipherOption SASL message and * negotiated cipher option * @throws IOException for any error */ public static SaslResponseWithNegotiatedCipherOption readSaslMessageAndNegotiatedCipherOption(InputStream in) throws IOException { DataTransferEncryptorMessageProto proto = DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { throw new InvalidEncryptionKeyException(proto.getMessage()); } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { throw new IOException(proto.getMessage()); } else { byte[] response = proto.getPayload().toByteArray(); List<CipherOption> options = PBHelperClient.convertCipherOptionProtos( proto.getCipherOptionList()); CipherOption option = null; if (options != null && !options.isEmpty()) { option = options.get(0); } return new SaslResponseWithNegotiatedCipherOption(response, option); } }
/** * Recreate an encryption key based on the given key id and nonce. * * @param keyId identifier of the secret key used to generate the encryption key. * @param nonce random value used to create the encryption key * @return the encryption key which corresponds to this (keyId, blockPoolId, nonce) * @throws InvalidEncryptionKeyException */ public byte[] retrieveDataEncryptionKey(int keyId, byte[] nonce) throws InvalidEncryptionKeyException { BlockKey key = null; synchronized (this) { key = allKeys.get(keyId); if (key == null) { throw new InvalidEncryptionKeyException("Can't re-compute encryption key" + " for nonce, since the required block key (keyID=" + keyId + ") doesn't exist. Current key: " + currentKey.getKeyId()); } } return createPassword(nonce, key.getKey()); }
/** * Reads a SASL negotiation message. * * @param in stream to read * @return bytes of SASL negotiation messsage * @throws IOException for any error */ public static byte[] readSaslMessage(InputStream in) throws IOException { DataTransferEncryptorMessageProto proto = DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { throw new InvalidEncryptionKeyException(proto.getMessage()); } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { throw new IOException(proto.getMessage()); } else { return proto.getPayload().toByteArray(); } }
/** * Recreate an encryption key based on the given key id and nonce. * * @param keyId identifier of the secret key used to generate the encryption key. * @param nonce random value used to create the encryption key * @return the encryption key which corresponds to this (keyId, blockPoolId, nonce) * @throws InvalidToken * @throws InvalidEncryptionKeyException */ public byte[] retrieveDataEncryptionKey(int keyId, byte[] nonce) throws InvalidEncryptionKeyException { BlockKey key = null; synchronized (this) { key = allKeys.get(keyId); if (key == null) { throw new InvalidEncryptionKeyException("Can't re-compute encryption key" + " for nonce, since the required block key (keyID=" + keyId + ") doesn't exist. Current key: " + currentKey.getKeyId()); } } return createPassword(nonce, key.getKey()); }
/** * Recreate an encryption key based on the given key id and nonce. * * @param keyId * identifier of the secret key used to generate the encryption key. * @param nonce * random value used to create the encryption key * @return the encryption key which corresponds to this (keyId, blockPoolId, * nonce) * @throws InvalidToken * @throws InvalidEncryptionKeyException */ public byte[] retrieveDataEncryptionKey(int keyId, byte[] nonce) throws InvalidEncryptionKeyException { BlockKey key = null; synchronized (this) { key = allKeys.get(keyId); if (key == null) { throw new InvalidEncryptionKeyException( "Can't re-compute encryption key" + " for nonce, since the required block key (keyID=" + keyId + ") doesn't exist. Current key: " + currentKey.getKeyId()); } } return createPassword(nonce, key.getKey()); }
private void check(DataTransferEncryptorMessageProto proto) throws IOException { if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { throw new InvalidEncryptionKeyException(proto.getMessage()); } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { throw new IOException(proto.getMessage()); } }
/** * This method actually executes the server-side SASL handshake. * * @param underlyingOut connection output stream * @param underlyingIn connection input stream * @param saslProps properties of SASL negotiation * @param callbackHandler for responding to SASL callbacks * @return new pair of streams, wrapped after SASL negotiation * @throws IOException for any error */ private IOStreamPair doSaslHandshake(OutputStream underlyingOut, InputStream underlyingIn, Map<String, String> saslProps, CallbackHandler callbackHandler) throws IOException { DataInputStream in = new DataInputStream(underlyingIn); DataOutputStream out = new DataOutputStream(underlyingOut); SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(saslProps, callbackHandler); int magicNumber = in.readInt(); if (magicNumber != SASL_TRANSFER_MAGIC_NUMBER) { throw new InvalidMagicNumberException(magicNumber, dnConf.getEncryptDataTransfer()); } try { // step 1 byte[] remoteResponse = readSaslMessage(in); byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); sendSaslMessage(out, localResponse); // step 2 (server-side only) List<CipherOption> cipherOptions = Lists.newArrayList(); remoteResponse = readSaslMessageAndNegotiationCipherOptions( in, cipherOptions); localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); // SASL handshake is complete checkSaslComplete(sasl, saslProps); CipherOption cipherOption = null; if (sasl.isNegotiatedQopPrivacy()) { // Negotiate a cipher option cipherOption = negotiateCipherOption(dnConf.getConf(), cipherOptions); if (cipherOption != null) { if (LOG.isDebugEnabled()) { LOG.debug("Server using cipher suite " + cipherOption.getCipherSuite().getName()); } } } // If negotiated cipher option is not null, wrap it before sending. sendSaslMessageAndNegotiatedCipherOption(out, localResponse, wrap(cipherOption, sasl)); // If negotiated cipher option is not null, we will use it to create // stream pair. return cipherOption != null ? createStreamPair( dnConf.getConf(), cipherOption, underlyingOut, underlyingIn, true) : sasl.createStreamPair(out, in); } catch (IOException ioe) { if (ioe instanceof SaslException && ioe.getCause() != null && ioe.getCause() instanceof InvalidEncryptionKeyException) { // This could just be because the client is long-lived and hasn't gotten // a new encryption key from the NN in a while. Upon receiving this // error, the client will get a new encryption key from the NN and retry // connecting to this DN. sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage()); } else { sendGenericSaslErrorMessage(out, ioe.getMessage()); } throw ioe; } }
/** * Open a DataInputStream to a DataNode so that it can be read from. * We get block ID and the IDs of the destinations at startup, from the namenode. */ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { if (target >= getFileLength()) { throw new IOException("Attempted to read past end of file"); } // Will be getting a new BlockReader. closeCurrentBlockReaders(); // // Connect to best DataNode for desired Block, with potential offset // DatanodeInfo chosenNode; int refetchToken = 1; // only need to get a new access token once int refetchEncryptionKey = 1; // only need to get a new encryption key once boolean connectFailedOnce = false; while (true) { // // Compute desired block // LocatedBlock targetBlock = getBlockAt(target); // update current position this.pos = target; this.blockEnd = targetBlock.getStartOffset() + targetBlock.getBlockSize() - 1; this.currentLocatedBlock = targetBlock; long offsetIntoBlock = target - targetBlock.getStartOffset(); DNAddrPair retval = chooseDataNode(targetBlock, null); chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; StorageType storageType = retval.storageType; try { blockReader = getBlockReader(targetBlock, offsetIntoBlock, targetBlock.getBlockSize() - offsetIntoBlock, targetAddr, storageType, chosenNode); if(connectFailedOnce) { DFSClient.LOG.info("Successfully connected to " + targetAddr + " for " + targetBlock.getBlock()); } return chosenNode; } catch (IOException ex) { if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + targetAddr + " : " + ex); // The encryption key used is invalid. refetchEncryptionKey--; dfsClient.clearDataEncryptionKey(); } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) { refetchToken--; fetchBlockAt(target); } else { connectFailedOnce = true; DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block" + ", add to deadNodes and continue. " + ex, ex); // Put chosen node into dead list, continue addToDeadNodes(chosenNode); } } } }
boolean createBlockReader(LocatedBlock block, int chunkIndex) throws IOException { BlockReader reader = null; final ReaderRetryPolicy retry = new ReaderRetryPolicy(); DNAddrPair dnInfo = new DNAddrPair(null, null, null); while(true) { try { // the cached block location might have been re-fetched, so always // get it from cache. block = refreshLocatedBlock(block); targetBlocks[chunkIndex] = block; // internal block has one location, just rule out the deadNodes dnInfo = getBestNodeDNAddrPair(block, null); if (dnInfo == null) { break; } reader = getBlockReader(block, alignedStripe.getOffsetInBlock(), block.getBlockSize() - alignedStripe.getOffsetInBlock(), dnInfo.addr, dnInfo.storageType, dnInfo.info); } catch (IOException e) { if (e instanceof InvalidEncryptionKeyException && retry.shouldRefetchEncryptionKey()) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + dnInfo.addr + " : " + e); dfsClient.clearDataEncryptionKey(); retry.refetchEncryptionKey(); } else if (retry.shouldRefetchToken() && tokenRefetchNeeded(e, dnInfo.addr)) { fetchBlockAt(block.getStartOffset()); retry.refetchToken(); } else { //TODO: handles connection issues DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " + "block" + block.getBlock(), e); // re-fetch the block in case the block has been moved fetchBlockAt(block.getStartOffset()); addToDeadNodes(dnInfo.info); } } if (reader != null) { readerInfos[chunkIndex] = new BlockReaderInfo(reader, dnInfo.info, alignedStripe.getOffsetInBlock()); return true; } } return false; }
/** * This method actually executes the server-side SASL handshake. * * @param underlyingOut connection output stream * @param underlyingIn connection input stream * @param saslProps properties of SASL negotiation * @param callbackHandler for responding to SASL callbacks * @return new pair of streams, wrapped after SASL negotiation * @throws IOException for any error */ private IOStreamPair doSaslHandshake(OutputStream underlyingOut, InputStream underlyingIn, Map<String, String> saslProps, CallbackHandler callbackHandler) throws IOException { DataInputStream in = new DataInputStream(underlyingIn); DataOutputStream out = new DataOutputStream(underlyingOut); SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(saslProps, callbackHandler); int magicNumber = in.readInt(); if (magicNumber != SASL_TRANSFER_MAGIC_NUMBER) { throw new InvalidMagicNumberException(magicNumber); } try { // step 1 byte[] remoteResponse = readSaslMessage(in); byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); sendSaslMessage(out, localResponse); // step 2 (server-side only) List<CipherOption> cipherOptions = Lists.newArrayList(); remoteResponse = readSaslMessageAndNegotiationCipherOptions( in, cipherOptions); localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); // SASL handshake is complete checkSaslComplete(sasl, saslProps); CipherOption cipherOption = null; if (sasl.isNegotiatedQopPrivacy()) { // Negotiate a cipher option cipherOption = negotiateCipherOption(dnConf.getConf(), cipherOptions); if (cipherOption != null) { if (LOG.isDebugEnabled()) { LOG.debug("Server using cipher suite " + cipherOption.getCipherSuite().getName()); } } } // If negotiated cipher option is not null, wrap it before sending. sendSaslMessageAndNegotiatedCipherOption(out, localResponse, wrap(cipherOption, sasl)); // If negotiated cipher option is not null, we will use it to create // stream pair. return cipherOption != null ? createStreamPair( dnConf.getConf(), cipherOption, underlyingOut, underlyingIn, true) : sasl.createStreamPair(out, in); } catch (IOException ioe) { if (ioe instanceof SaslException && ioe.getCause() != null && ioe.getCause() instanceof InvalidEncryptionKeyException) { // This could just be because the client is long-lived and hasn't gotten // a new encryption key from the NN in a while. Upon receiving this // error, the client will get a new encryption key from the NN and retry // connecting to this DN. sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage()); } else { sendGenericSaslErrorMessage(out, ioe.getMessage()); } throw ioe; } }
/** * Determine if an exception is security-related. * * We need to handle these exceptions differently than other IOExceptions. * They don't indicate a communication problem. Instead, they mean that there * is some action the client needs to take, such as refetching block tokens, * renewing encryption keys, etc. * * @param ioe The exception * @return True only if the exception is security-related. */ private static boolean isSecurityException(IOException ioe) { return (ioe instanceof InvalidToken) || (ioe instanceof InvalidEncryptionKeyException) || (ioe instanceof InvalidBlockTokenException) || (ioe instanceof AccessControlException); }