private ClientContext(String name, Conf conf) { this.name = name; this.confString = confAsString(conf); this.shortCircuitCache = new ShortCircuitCache( conf.shortCircuitStreamsCacheSize, conf.shortCircuitStreamsCacheExpiryMs, conf.shortCircuitMmapCacheSize, conf.shortCircuitMmapCacheExpiryMs, conf.shortCircuitMmapCacheRetryTimeout, conf.shortCircuitCacheStaleThresholdMs, conf.shortCircuitSharedMemoryWatcherInterruptCheckMs); this.peerCache = new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry); this.keyProviderCache = new KeyProviderCache(conf.keyProviderCacheExpiryMs); this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal; this.domainSocketFactory = new DomainSocketFactory(conf); this.byteArrayManager = ByteArrayManager.newInstance(conf.writeByteArrayManagerConf); }
public DomainSocketFactory(Conf conf) { final String feature; if (conf.isShortCircuitLocalReads() && (!conf.isUseLegacyBlockReaderLocal())) { feature = "The short-circuit local reads feature"; } else if (conf.isDomainSocketDataTraffic()) { feature = "UNIX domain socket data traffic"; } else { feature = null; } if (feature == null) { PerformanceAdvisory.LOG.debug( "Both short-circuit local reads and UNIX domain socket are disabled."); } else { if (conf.getDomainSocketPath().isEmpty()) { throw new HadoopIllegalArgumentException(feature + " is enabled but " + DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + " is not set."); } else if (DomainSocket.getLoadingFailureReason() != null) { LOG.warn(feature + " cannot be used because " + DomainSocket.getLoadingFailureReason()); } else { LOG.debug(feature + " is enabled."); } } }
/** * Get information about a domain socket path. * * @param addr The inet address to use. * @param conf The client configuration. * * @return Information about the socket path. */ public PathInfo getPathInfo(InetSocketAddress addr, DFSClient.Conf conf) { // If there is no domain socket path configured, we can't use domain // sockets. if (conf.getDomainSocketPath().isEmpty()) return PathInfo.NOT_CONFIGURED; // If we can't do anything with the domain socket, don't create it. if (!conf.isDomainSocketDataTraffic() && (!conf.isShortCircuitLocalReads() || conf.isUseLegacyBlockReaderLocal())) { return PathInfo.NOT_CONFIGURED; } // If the DomainSocket code is not loaded, we can't create // DomainSocket objects. if (DomainSocket.getLoadingFailureReason() != null) { return PathInfo.NOT_CONFIGURED; } // UNIX domain sockets can only be used to talk to local peers if (!DFSClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED; String escapedPath = DomainSocket.getEffectivePath( conf.getDomainSocketPath(), addr.getPort()); PathState status = pathMap.getIfPresent(escapedPath); if (status == null) { return new PathInfo(escapedPath, PathState.VALID); } else { return new PathInfo(escapedPath, status); } }
/** * Creates the namenode proxy with the passed protocol. This will handle * creation of either HA- or non-HA-enabled proxy objects, depending upon * if the provided URI is a configured logical URI. * * @param conf the configuration containing the required IPC * properties, client failover configurations, etc. * @param nameNodeUri the URI pointing either to a specific NameNode * or to a logical nameservice. * @param xface the IPC interface which should be created * @return an object containing both the proxy and the associated * delegation token service it corresponds to * @throws IOException if there is an error creating the proxy **/ @SuppressWarnings("unchecked") public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException { Class<FailoverProxyProvider<T>> failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri, xface); if (failoverProxyProviderClass == null) { // Non-HA case return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface, UserGroupInformation.getCurrentUser(), true); } else { // HA case FailoverProxyProvider<T> failoverProxyProvider = NameNodeProxies .createFailoverProxyProvider(conf, failoverProxyProviderClass, xface, nameNodeUri); Conf config = new Conf(conf); T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, config.failoverSleepBaseMillis, config.failoverSleepMaxMillis)); Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri); return new ProxyAndInfo<T>(proxy, dtService); } }
public DomainSocketFactory(Conf conf) { this.conf = conf; final String feature; if (conf.shortCircuitLocalReads && (!conf.useLegacyBlockReaderLocal)) { feature = "The short-circuit local reads feature"; } else if (conf.domainSocketDataTraffic) { feature = "UNIX domain socket data traffic"; } else { feature = null; } if (feature == null) { LOG.debug("Both short-circuit local reads and UNIX domain socket are disabled."); } else { if (conf.domainSocketPath.isEmpty()) { throw new HadoopIllegalArgumentException(feature + " is enabled but " + DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + " is not set."); } else if (DomainSocket.getLoadingFailureReason() != null) { LOG.warn(feature + " cannot be used because " + DomainSocket.getLoadingFailureReason()); } else { LOG.debug(feature + " is enabled."); } } }
private ClientContext(String name, Conf conf) { this.name = name; this.confString = confAsString(conf); this.shortCircuitCache = new ShortCircuitCache( conf.shortCircuitStreamsCacheSize, conf.shortCircuitStreamsCacheExpiryMs, conf.shortCircuitMmapCacheSize, conf.shortCircuitMmapCacheExpiryMs, conf.shortCircuitMmapCacheRetryTimeout, conf.shortCircuitCacheStaleThresholdMs, conf.shortCircuitSharedMemoryWatcherInterruptCheckMs); this.peerCache = new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry); this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal; this.domainSocketFactory = new DomainSocketFactory(conf); this.byteArrayManager = ByteArrayManager.newInstance(conf.writeByteArrayManagerConf); }
public DomainSocketFactory(Conf conf) { final String feature; if (conf.shortCircuitLocalReads && (!conf.useLegacyBlockReaderLocal)) { feature = "The short-circuit local reads feature"; } else if (conf.domainSocketDataTraffic) { feature = "UNIX domain socket data traffic"; } else { feature = null; } if (feature == null) { LOG.debug("Both short-circuit local reads and UNIX domain socket are disabled."); } else { if (conf.domainSocketPath.isEmpty()) { throw new HadoopIllegalArgumentException(feature + " is enabled but " + DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + " is not set."); } else if (DomainSocket.getLoadingFailureReason() != null) { LOG.warn(feature + " cannot be used because " + DomainSocket.getLoadingFailureReason()); } else { LOG.debug(feature + " is enabled."); } } }
/** * Get information about a domain socket path. * * @param addr The inet address to use. * @param conf The client configuration. * * @return Information about the socket path. */ public PathInfo getPathInfo(InetSocketAddress addr, DFSClient.Conf conf) { // If there is no domain socket path configured, we can't use domain // sockets. if (conf.domainSocketPath.isEmpty()) return PathInfo.NOT_CONFIGURED; // If we can't do anything with the domain socket, don't create it. if (!conf.domainSocketDataTraffic && (!conf.shortCircuitLocalReads || conf.useLegacyBlockReaderLocal)) { return PathInfo.NOT_CONFIGURED; } // If the DomainSocket code is not loaded, we can't create // DomainSocket objects. if (DomainSocket.getLoadingFailureReason() != null) { return PathInfo.NOT_CONFIGURED; } // UNIX domain sockets can only be used to talk to local peers if (!DFSClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED; String escapedPath = DomainSocket. getEffectivePath(conf.domainSocketPath, addr.getPort()); PathState status = pathMap.getIfPresent(escapedPath); if (status == null) { return new PathInfo(escapedPath, PathState.VALID); } else { return new PathInfo(escapedPath, status); } }
private ClientContext(String name, Conf conf) { this.name = name; this.confString = confAsString(conf); this.shortCircuitCache = new ShortCircuitCache( conf.shortCircuitStreamsCacheSize, conf.shortCircuitStreamsCacheExpiryMs, conf.shortCircuitMmapCacheSize, conf.shortCircuitMmapCacheExpiryMs, conf.shortCircuitMmapCacheRetryTimeout, conf.shortCircuitCacheStaleThresholdMs, conf.shortCircuitSharedMemoryWatcherInterruptCheckMs); this.peerCache = new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry); this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal; this.domainSocketFactory = new DomainSocketFactory(conf); }
/** * Creates the namenode proxy with the passed protocol. This will handle * creation of either HA- or non-HA-enabled proxy objects, depending upon * if the provided URI is a configured logical URI. * * @param conf the configuration containing the required IPC * properties, client failover configurations, etc. * @param nameNodeUri the URI pointing either to a specific NameNode * or to a logical nameservice. * @param xface the IPC interface which should be created * @param fallbackToSimpleAuth set to true or false during calls to indicate if * a secure client falls back to simple auth * @return an object containing both the proxy and the associated * delegation token service it corresponds to * @throws IOException if there is an error creating the proxy **/ @SuppressWarnings("unchecked") public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth) throws IOException { AbstractNNFailoverProxyProvider<T> failoverProxyProvider = createFailoverProxyProvider(conf, nameNodeUri, xface, true, fallbackToSimpleAuth); if (failoverProxyProvider == null) { // Non-HA case return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface, UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); } else { // HA case Conf config = new Conf(conf); T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, config.maxRetryAttempts, config.failoverSleepBaseMillis, config.failoverSleepMaxMillis)); Text dtService; if (failoverProxyProvider.useLogicalURI()) { dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri, HdfsConstants.HDFS_URI_SCHEME); } else { dtService = SecurityUtil.buildTokenService( NameNode.getAddress(nameNodeUri)); } return new ProxyAndInfo<T>(proxy, dtService, NameNode.getAddress(nameNodeUri)); } }
public static String confAsString(Conf conf) { StringBuilder builder = new StringBuilder(); builder.append("shortCircuitStreamsCacheSize = "). append(conf.shortCircuitStreamsCacheSize). append(", shortCircuitStreamsCacheExpiryMs = "). append(conf.shortCircuitStreamsCacheExpiryMs). append(", shortCircuitMmapCacheSize = "). append(conf.shortCircuitMmapCacheSize). append(", shortCircuitMmapCacheExpiryMs = "). append(conf.shortCircuitMmapCacheExpiryMs). append(", shortCircuitMmapCacheRetryTimeout = "). append(conf.shortCircuitMmapCacheRetryTimeout). append(", shortCircuitCacheStaleThresholdMs = "). append(conf.shortCircuitCacheStaleThresholdMs). append(", socketCacheCapacity = "). append(conf.socketCacheCapacity). append(", socketCacheExpiry = "). append(conf.socketCacheExpiry). append(", shortCircuitLocalReads = "). append(conf.shortCircuitLocalReads). append(", useLegacyBlockReaderLocal = "). append(conf.useLegacyBlockReaderLocal). append(", domainSocketDataTraffic = "). append(conf.domainSocketDataTraffic). append(", shortCircuitSharedMemoryWatcherInterruptCheckMs = "). append(conf.shortCircuitSharedMemoryWatcherInterruptCheckMs). append(", keyProviderCacheExpiryMs = "). append(conf.keyProviderCacheExpiryMs); return builder.toString(); }
public static ClientContext get(String name, Conf conf) { ClientContext context; synchronized(ClientContext.class) { context = CACHES.get(name); if (context == null) { context = new ClientContext(name, conf); CACHES.put(name, context); } else { context.printConfWarningIfNeeded(conf); } } return context; }
/** * Get a client context, from a Configuration object. * * This method is less efficient than the version which takes a DFSClient#Conf * object, and should be mostly used by tests. */ @VisibleForTesting public static ClientContext getFromConf(Configuration conf) { return get(conf.get(DFSConfigKeys.DFS_CLIENT_CONTEXT, DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT), new DFSClient.Conf(conf)); }
private void printConfWarningIfNeeded(Conf conf) { String existing = this.getConfString(); String requested = confAsString(conf); if (!existing.equals(requested)) { if (!printedConfWarning) { printedConfWarning = true; LOG.warn("Existing client context '" + name + "' does not match " + "requested configuration. Existing: " + existing + ", Requested: " + requested); } } }
public static String confAsString(Conf conf) { StringBuilder builder = new StringBuilder(); builder.append("shortCircuitStreamsCacheSize = "). append(conf.shortCircuitStreamsCacheSize). append(", shortCircuitStreamsCacheExpiryMs = "). append(conf.shortCircuitStreamsCacheExpiryMs). append(", shortCircuitMmapCacheSize = "). append(conf.shortCircuitMmapCacheSize). append(", shortCircuitMmapCacheExpiryMs = "). append(conf.shortCircuitMmapCacheExpiryMs). append(", shortCircuitMmapCacheRetryTimeout = "). append(conf.shortCircuitMmapCacheRetryTimeout). append(", shortCircuitCacheStaleThresholdMs = "). append(conf.shortCircuitCacheStaleThresholdMs). append(", socketCacheCapacity = "). append(conf.socketCacheCapacity). append(", socketCacheExpiry = "). append(conf.socketCacheExpiry). append(", shortCircuitLocalReads = "). append(conf.shortCircuitLocalReads). append(", useLegacyBlockReaderLocal = "). append(conf.useLegacyBlockReaderLocal). append(", domainSocketDataTraffic = "). append(conf.domainSocketDataTraffic). append(", shortCircuitSharedMemoryWatcherInterruptCheckMs = "). append(conf.shortCircuitSharedMemoryWatcherInterruptCheckMs); return builder.toString(); }