public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, Set<String> excludedNodes, long movedWinWidth, int moverThreads, int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) { this.nnc = nnc; this.excludedNodes = excludedNodes; this.includedNodes = includedNodes; this.movedBlocks = new MovedBlocks<StorageGroup>(movedWinWidth); this.cluster = NetworkTopology.getInstance(conf); this.moveExecutor = Executors.newFixedThreadPool(moverThreads); this.dispatchExecutor = dispatcherThreads == 0? null : Executors.newFixedThreadPool(dispatcherThreads); this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; this.saslClient = new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth); }
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, Set<String> excludedNodes, long movedWinWidth, int moverThreads, int dispatcherThreads, int maxConcurrentMovesPerNode, long getBlocksSize, long getBlocksMinBlockSize, Configuration conf) { this.nnc = nnc; this.excludedNodes = excludedNodes; this.includedNodes = includedNodes; this.movedBlocks = new MovedBlocks<StorageGroup>(movedWinWidth); this.cluster = NetworkTopology.getInstance(conf); this.dispatchExecutor = dispatcherThreads == 0? null : Executors.newFixedThreadPool(dispatcherThreads); this.moverThreadAllocator = new Allocator(moverThreads); this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; this.getBlocksSize = getBlocksSize; this.getBlocksMinBlockSize = getBlocksMinBlockSize; this.saslClient = new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth); this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); this.connectToDnViaHostname = conf.getBoolean( HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME, HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); placementPolicies = new BlockPlacementPolicies(conf, null, cluster, null); }
static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo, int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Void> saslPromise) throws IOException { SaslDataTransferClient saslClient = client.getSaslDataTransferClient(); SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient); TrustedChannelResolver trustedChannelResolver = SASL_ADAPTOR.getTrustedChannelResolver(saslClient); AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient); InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress(); if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) { saslPromise.trySuccess(null); return; } DataEncryptionKey encryptionKey = client.newDataEncryptionKey(); if (encryptionKey != null) { if (LOG.isDebugEnabled()) { LOG.debug( "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo); } doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), encryptionKeyToPassword(encryptionKey.encryptionKey), createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise); } else if (!UserGroupInformation.isSecurityEnabled()) { if (LOG.isDebugEnabled()) { LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr + ", datanodeId = " + dnInfo); } saslPromise.trySuccess(null); } else if (dnInfo.getXferPort() < 1024) { if (LOG.isDebugEnabled()) { LOG.debug("SASL client skipping handshake in secured configuration with " + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo); } saslPromise.trySuccess(null); } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { if (LOG.isDebugEnabled()) { LOG.debug("SASL client skipping handshake in secured configuration with " + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo); } saslPromise.trySuccess(null); } else if (saslPropsResolver != null) { if (LOG.isDebugEnabled()) { LOG.debug( "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo); } doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise); } else { // It's a secured cluster using non-privileged ports, but no SASL. The only way this can // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare // edge case. if (LOG.isDebugEnabled()) { LOG.debug("SASL client skipping handshake in secured configuration with no SASL " + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo); } saslPromise.trySuccess(null); } }
NameNodeConnector(URI nameNodeUri, Configuration conf) throws IOException { this.nameNodeUri = nameNodeUri; this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class) .getProxy(); this.client = NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class) .getProxy(); this.fs = FileSystem.get(nameNodeUri, conf); final NamespaceInfo namespaceinfo = namenode.versionRequest(); this.blockpoolID = namespaceinfo.getBlockPoolID(); final ExportedBlockKeys keys = namenode.getBlockKeys(); this.isBlockTokenEnabled = keys.isBlockTokenEnabled(); if (isBlockTokenEnabled) { long blockKeyUpdateInterval = keys.getKeyUpdateInterval(); long blockTokenLifetime = keys.getTokenLifetime(); LOG.info("Block token params received from NN: keyUpdateInterval=" + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000) + " min(s)"); String encryptionAlgorithm = conf.get( DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY); this.blockTokenSecretManager = new BlockTokenSecretManager( blockKeyUpdateInterval, blockTokenLifetime, blockpoolID, encryptionAlgorithm); this.blockTokenSecretManager.addKeys(keys); /* * Balancer should sync its block keys with NN more frequently than NN * updates its block keys */ this.keyUpdaterInterval = blockKeyUpdateInterval / 4; LOG.info("Balancer will update its block keys every " + keyUpdaterInterval / (60 * 1000) + " minute(s)"); this.keyupdaterthread = new Daemon(new BlockKeyUpdater()); this.shouldRun = true; this.keyupdaterthread.start(); } this.encryptDataTransfer = fs.getServerDefaults(new Path("/")) .getEncryptDataTransfer(); // Check if there is another balancer running. // Exit if there is another one running. out = checkAndMarkRunningBalancer(); if (out == null) { throw new IOException("Another balancer is running"); } this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf); }
public DNConf(Configuration conf) { socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT); socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, HdfsServerConstants.WRITE_TIMEOUT); socketKeepaliveTimeout = conf.getInt( DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT); /* Based on results on different platforms, we might need set the default * to false on some of them. */ transferToAllowed = conf.getBoolean( DFS_DATANODE_TRANSFERTO_ALLOWED_KEY, DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT); writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); readaheadLength = conf.getLong( DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); dropCacheBehindWrites = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT); syncBehindWrites = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY, DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT); dropCacheBehindReads = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT); connectToDnViaHostname = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); this.blockReportSplitThreshold = conf.getLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT); this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT); long initBRDelay = conf.getLong( DFS_BLOCKREPORT_INITIAL_DELAY_KEY, DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L; if (initBRDelay >= blockReportInterval) { initBRDelay = 0; DataNode.LOG.info("dfs.blockreport.initialDelay is greater than " + "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:"); } initialBlockReportDelay = initBRDelay; heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L; this.deleteReportInterval = 100 * heartBeatInterval; // do we need to sync block file contents to disk when blockfile is closed? this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, DFS_DATANODE_SYNCONCLOSE_DEFAULT); this.minimumNameNodeVersion = conf.get(DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY, DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT); this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf); this.xceiverStopTimeout = conf.getLong( DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); this.maxLockedMemory = conf.getLong( DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT); this.restartReplicaExpiry = conf.getLong( DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY, DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L; }
/** * Creates a new SaslDataTransferClient. * * @param conf the configuration * @param saslPropsResolver for determining properties of SASL negotiation * @param trustedChannelResolver for identifying trusted connections that do * not require SASL negotiation * @param fallbackToSimpleAuth checked on each attempt at general SASL * handshake, if true forces use of simple auth */ public SaslDataTransferClient(Configuration conf, SaslPropertiesResolver saslPropsResolver, TrustedChannelResolver trustedChannelResolver, AtomicBoolean fallbackToSimpleAuth) { this.conf = conf; this.fallbackToSimpleAuth = fallbackToSimpleAuth; this.saslPropsResolver = saslPropsResolver; this.trustedChannelResolver = trustedChannelResolver; }
/** * Creates a new SaslDataTransferClient. This constructor is used in cases * where it is not relevant to track if a secure client did a fallback to * simple auth. For intra-cluster connections between data nodes in the same * cluster, we can assume that all run under the same security configuration. * * @param conf the configuration * @param saslPropsResolver for determining properties of SASL negotiation * @param trustedChannelResolver for identifying trusted connections that do * not require SASL negotiation */ public SaslDataTransferClient(Configuration conf, SaslPropertiesResolver saslPropsResolver, TrustedChannelResolver trustedChannelResolver) { this(conf, saslPropsResolver, trustedChannelResolver, null); }
/** * Returns the TrustedChannelResolver configured for use with * DataTransferProtocol, or null if not configured. * * @return TrustedChannelResolver configured for use with DataTransferProtocol */ public TrustedChannelResolver getTrustedChannelResolver() { return trustedChannelResolver; }
TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient);