/** * Creates the namenode proxy with the ClientProtocol. This will handle * creation of either HA- or non-HA-enabled proxy objects, depending upon * if the provided URI is a configured logical URI. * * @param conf the configuration containing the required IPC * properties, client failover configurations, etc. * @param nameNodeUri the URI pointing either to a specific NameNode * or to a logical nameservice. * @param fallbackToSimpleAuth set to true or false during calls to indicate * if a secure client falls back to simple auth * @return an object containing both the proxy and the associated * delegation token service it corresponds to * @throws IOException if there is an error creating the proxy * @see {@link NameNodeProxies#createProxy(Configuration, URI, Class)}. */ public static ProxyAndInfo<NuCypherExtClientProtocol> createProxyWithNuCypherExtClientProtocol( Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth) throws IOException { AbstractNNFailoverProxyProvider<NuCypherExtClientProtocol> failoverProxyProvider = createFailoverProxyProvider(conf, nameNodeUri, NuCypherExtClientProtocol.class, true, fallbackToSimpleAuth); if (failoverProxyProvider == null) { InetSocketAddress nnAddr = NuCypherExtUtilClient.getNNAddress(nameNodeUri); Text dtService = SecurityUtil.buildTokenService(nnAddr); NuCypherExtClientProtocol proxy = createNonHAProxyWithNuCypherExtClientProtocol(nnAddr, conf, UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); return new ProxyAndInfo<>(proxy, dtService, nnAddr); } else { return createHAProxy(conf, nameNodeUri, NuCypherExtClientProtocol.class, failoverProxyProvider); } }
/** * Creates an explicitly HA-enabled proxy object. * * @param conf the configuration object * @param nameNodeUri the URI pointing either to a specific NameNode or to a * logical nameservice. * @param xface the IPC interface which should be created * @param failoverProxyProvider Failover proxy provider * @return an object containing both the proxy and the associated * delegation token service it corresponds to */ @SuppressWarnings("unchecked") public static <T> ProxyAndInfo<T> createHAProxy( Configuration conf, URI nameNodeUri, Class<T> xface, AbstractNNFailoverProxyProvider<T> failoverProxyProvider) { Preconditions.checkNotNull(failoverProxyProvider); // HA case DfsClientConf config = new DfsClientConf(conf); T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(), config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(), config.getFailoverSleepMaxMillis())); Text dtService; if (failoverProxyProvider.useLogicalURI()) { dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri, HdfsConstants.HDFS_URI_SCHEME); } else { dtService = SecurityUtil.buildTokenService( NuCypherExtUtilClient.getNNAddress(nameNodeUri)); } return new ProxyAndInfo<>(proxy, dtService, NuCypherExtUtilClient.getNNAddressCheckLogical(conf, nameNodeUri)); }
/** * Creates the namenode proxy with the ClientProtocol. This will handle * creation of either HA- or non-HA-enabled proxy objects, depending upon * if the provided URI is a configured logical URI. * * @param conf the configuration containing the required IPC * properties, client failover configurations, etc. * @param nameNodeUri the URI pointing either to a specific NameNode * or to a logical nameservice. * @param fallbackToSimpleAuth set to true or false during calls to indicate * if a secure client falls back to simple auth * @return an object containing both the proxy and the associated * delegation token service it corresponds to * @throws IOException if there is an error creating the proxy * @see {@link NameNodeProxies#createProxy(Configuration, URI, Class)}. */ public static ProxyAndInfo<ClientProtocol> createProxyWithClientProtocol( Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth) throws IOException { AbstractNNFailoverProxyProvider<ClientProtocol> failoverProxyProvider = createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, true, fallbackToSimpleAuth); if (failoverProxyProvider == null) { InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri); Text dtService = SecurityUtil.buildTokenService(nnAddr); ClientProtocol proxy = createNonHAProxyWithClientProtocol(nnAddr, conf, UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); return new ProxyAndInfo<>(proxy, dtService, nnAddr); } else { return createHAProxy(conf, nameNodeUri, ClientProtocol.class, failoverProxyProvider); } }
/** * Creates an explicitly HA-enabled proxy object. * * @param conf the configuration object * @param nameNodeUri the URI pointing either to a specific NameNode or to a * logical nameservice. * @param xface the IPC interface which should be created * @param failoverProxyProvider Failover proxy provider * @return an object containing both the proxy and the associated * delegation token service it corresponds to */ @SuppressWarnings("unchecked") public static <T> ProxyAndInfo<T> createHAProxy( Configuration conf, URI nameNodeUri, Class<T> xface, AbstractNNFailoverProxyProvider<T> failoverProxyProvider) { Preconditions.checkNotNull(failoverProxyProvider); // HA case DfsClientConf config = new DfsClientConf(conf); T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(), config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(), config.getFailoverSleepMaxMillis())); Text dtService; if (failoverProxyProvider.useLogicalURI()) { dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri, HdfsConstants.HDFS_URI_SCHEME); } else { dtService = SecurityUtil.buildTokenService( DFSUtilClient.getNNAddress(nameNodeUri)); } return new ProxyAndInfo<>(proxy, dtService, DFSUtilClient.getNNAddressCheckLogical(conf, nameNodeUri)); }
/** * Creates the namenode proxy with the passed protocol. This will handle * creation of either HA- or non-HA-enabled proxy objects, depending upon * if the provided URI is a configured logical URI. * * @param conf the configuration containing the required IPC * properties, client failover configurations, etc. * @param nameNodeUri the URI pointing either to a specific NameNode * or to a logical nameservice. * @param xface the IPC interface which should be created * @param fallbackToSimpleAuth set to true or false during calls to indicate if * a secure client falls back to simple auth * @return an object containing both the proxy and the associated * delegation token service it corresponds to * @throws IOException if there is an error creating the proxy **/ @SuppressWarnings("unchecked") public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth) throws IOException { AbstractNNFailoverProxyProvider<T> failoverProxyProvider = NameNodeProxiesClient.createFailoverProxyProvider(conf, nameNodeUri, xface, true, fallbackToSimpleAuth); if (failoverProxyProvider == null) { return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri), xface, UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); } else { return NameNodeProxiesClient.createHAProxy(conf, nameNodeUri, xface, failoverProxyProvider); } }
/** * Creates the namenode proxy with the passed protocol. This will handle * creation of either HA- or non-HA-enabled proxy objects, depending upon * if the provided URI is a configured logical URI. * * @param conf the configuration containing the required IPC * properties, client failover configurations, etc. * @param nameNodeUri the URI pointing either to a specific NameNode * or to a logical nameservice. * @param xface the IPC interface which should be created * @param fallbackToSimpleAuth set to true or false during calls to indicate if * a secure client falls back to simple auth * @return an object containing both the proxy and the associated * delegation token service it corresponds to * @throws IOException if there is an error creating the proxy **/ @SuppressWarnings("unchecked") public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth) throws IOException { AbstractNNFailoverProxyProvider<T> failoverProxyProvider = createFailoverProxyProvider(conf, nameNodeUri, xface, true, fallbackToSimpleAuth); if (failoverProxyProvider == null) { // Non-HA case return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface, UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); } else { // HA case Conf config = new Conf(conf); T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, config.maxRetryAttempts, config.failoverSleepBaseMillis, config.failoverSleepMaxMillis)); Text dtService; if (failoverProxyProvider.useLogicalURI()) { dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri, HdfsConstants.HDFS_URI_SCHEME); } else { dtService = SecurityUtil.buildTokenService( NameNode.getAddress(nameNodeUri)); } return new ProxyAndInfo<T>(proxy, dtService, NameNode.getAddress(nameNodeUri)); } }
/** * Check whether logical URI is needed for the namenode and * the corresponding failover proxy provider in the config. * * @param conf Configuration * @param nameNodeUri The URI of namenode * @return true if logical URI is needed. false, if not needed. * @throws IOException most likely due to misconfiguration. */ public static boolean useLogicalUri(Configuration conf, URI nameNodeUri) throws IOException { // Create the proxy provider. Actual proxy is not created. AbstractNNFailoverProxyProvider<ClientProtocol> provider = NameNodeProxies .createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, false, null); // No need to use logical URI since failover is not configured. if (provider == null) { return false; } // Check whether the failover proxy provider uses logical URI. return provider.useLogicalURI(); }
/** * Check whether logical URI is needed for the namenode and * the corresponding failover proxy provider in the config. * * @param conf Configuration * @param nameNodeUri The URI of namenode * @return true if logical URI is needed. false, if not needed. * @throws IOException most likely due to misconfiguration. */ public static boolean useLogicalUri(Configuration conf, URI nameNodeUri) throws IOException { // Create the proxy provider. Actual proxy is not created. AbstractNNFailoverProxyProvider<ClientProtocol> provider = NameNodeProxiesClient .createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, false, null); // No need to use logical URI since failover is not configured. if (provider == null) { return false; } // Check whether the failover proxy provider uses logical URI. return provider.useLogicalURI(); }
/** * Generate a dummy namenode proxy instance that utilizes our hacked * {@link LossyRetryInvocationHandler}. Proxy instance generated using this * method will proactively drop RPC responses. Currently this method only * support HA setup. null will be returned if the given configuration is not * for HA. * * @param config the configuration containing the required IPC * properties, client failover configurations, etc. * @param nameNodeUri the URI pointing either to a specific NameNode * or to a logical nameservice. * @param xface the IPC interface which should be created * @param numResponseToDrop The number of responses to drop for each RPC call * @param fallbackToSimpleAuth set to true or false during calls to indicate * if a secure client falls back to simple auth * @return an object containing both the proxy and the associated * delegation token service it corresponds to. Will return null of the * given configuration does not support HA. * @throws IOException if there is an error creating the proxy */ public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler( Configuration config, URI nameNodeUri, Class<T> xface, int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth) throws IOException { Preconditions.checkArgument(numResponseToDrop > 0); AbstractNNFailoverProxyProvider<T> failoverProxyProvider = createFailoverProxyProvider(config, nameNodeUri, xface, true, fallbackToSimpleAuth); if (failoverProxyProvider != null) { // HA case int delay = config.getInt( HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY, HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT); int maxCap = config.getInt( HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY, HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT); int maxFailoverAttempts = config.getInt( HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT); int maxRetryAttempts = config.getInt( HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT); InvocationHandler dummyHandler = new LossyRetryInvocationHandler<>( numResponseToDrop, failoverProxyProvider, RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, Math.max(numResponseToDrop + 1, maxRetryAttempts), delay, maxCap)); @SuppressWarnings("unchecked") T proxy = (T) Proxy.newProxyInstance( failoverProxyProvider.getInterface().getClassLoader(), new Class[]{xface}, dummyHandler); Text dtService; if (failoverProxyProvider.useLogicalURI()) { dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri, HdfsConstants.HDFS_URI_SCHEME); } else { dtService = SecurityUtil.buildTokenService( NuCypherExtUtilClient.getNNAddress(nameNodeUri)); } return new ProxyAndInfo<>(proxy, dtService, NuCypherExtUtilClient.getNNAddress(nameNodeUri)); } else { LOG.warn("Currently creating proxy using " + "LossyRetryInvocationHandler requires NN HA setup"); return null; } }
/** Creates the Failover proxy provider instance*/ @VisibleForTesting public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider( Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort, AtomicBoolean fallbackToSimpleAuth) throws IOException { Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null; AbstractNNFailoverProxyProvider<T> providerNN; try { // Obtain the class of the proxy provider failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri); if (failoverProxyProviderClass == null) { return null; } // Create a proxy provider instance. Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass .getConstructor(Configuration.class, URI.class, Class.class); FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri, xface); // If the proxy provider is of an old implementation, wrap it. if (!(provider instanceof AbstractNNFailoverProxyProvider)) { providerNN = new WrappedFailoverProxyProvider<>(provider); } else { providerNN = (AbstractNNFailoverProxyProvider<T>)provider; } } catch (Exception e) { final String message = "Couldn't create proxy provider " + failoverProxyProviderClass; LOG.debug(message, e); if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); } else { throw new IOException(message, e); } } // Check the port in the URI, if it is logical. if (checkPort && providerNN.useLogicalURI()) { int port = nameNodeUri.getPort(); if (port > 0 && port != HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT) { // Throwing here without any cleanup is fine since we have not // actually created the underlying proxies yet. throw new IOException("Port " + port + " specified in URI " + nameNodeUri + " but host '" + nameNodeUri.getHost() + "' is a logical (HA) namenode" + " and does not use port information."); } } providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth); return providerNN; }
/** * Generate a dummy namenode proxy instance that utilizes our hacked * {@link LossyRetryInvocationHandler}. Proxy instance generated using this * method will proactively drop RPC responses. Currently this method only * support HA setup. null will be returned if the given configuration is not * for HA. * * @param config the configuration containing the required IPC * properties, client failover configurations, etc. * @param nameNodeUri the URI pointing either to a specific NameNode * or to a logical nameservice. * @param xface the IPC interface which should be created * @param numResponseToDrop The number of responses to drop for each RPC call * @param fallbackToSimpleAuth set to true or false during calls to indicate if * a secure client falls back to simple auth * @return an object containing both the proxy and the associated * delegation token service it corresponds to. Will return null of the * given configuration does not support HA. * @throws IOException if there is an error creating the proxy */ @SuppressWarnings("unchecked") public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler( Configuration config, URI nameNodeUri, Class<T> xface, int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth) throws IOException { Preconditions.checkArgument(numResponseToDrop > 0); AbstractNNFailoverProxyProvider<T> failoverProxyProvider = createFailoverProxyProvider(config, nameNodeUri, xface, true, fallbackToSimpleAuth); if (failoverProxyProvider != null) { // HA case int delay = config.getInt( DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY, DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT); int maxCap = config.getInt( DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY, DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT); int maxFailoverAttempts = config.getInt( DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); int maxRetryAttempts = config.getInt( DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY, DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT); InvocationHandler dummyHandler = new LossyRetryInvocationHandler<T>( numResponseToDrop, failoverProxyProvider, RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, Math.max(numResponseToDrop + 1, maxRetryAttempts), delay, maxCap)); T proxy = (T) Proxy.newProxyInstance( failoverProxyProvider.getInterface().getClassLoader(), new Class[] { xface }, dummyHandler); Text dtService; if (failoverProxyProvider.useLogicalURI()) { dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri, HdfsConstants.HDFS_URI_SCHEME); } else { dtService = SecurityUtil.buildTokenService( NameNode.getAddress(nameNodeUri)); } return new ProxyAndInfo<T>(proxy, dtService, NameNode.getAddress(nameNodeUri)); } else { LOG.warn("Currently creating proxy using " + "LossyRetryInvocationHandler requires NN HA setup"); return null; } }
/** Creates the Failover proxy provider instance*/ @VisibleForTesting public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider( Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort, AtomicBoolean fallbackToSimpleAuth) throws IOException { Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null; AbstractNNFailoverProxyProvider<T> providerNN; Preconditions.checkArgument( xface.isAssignableFrom(NamenodeProtocols.class), "Interface %s is not a NameNode protocol", xface); try { // Obtain the class of the proxy provider failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri); if (failoverProxyProviderClass == null) { return null; } // Create a proxy provider instance. Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass .getConstructor(Configuration.class, URI.class, Class.class); FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri, xface); // If the proxy provider is of an old implementation, wrap it. if (!(provider instanceof AbstractNNFailoverProxyProvider)) { providerNN = new WrappedFailoverProxyProvider<T>(provider); } else { providerNN = (AbstractNNFailoverProxyProvider<T>)provider; } } catch (Exception e) { String message = "Couldn't create proxy provider " + failoverProxyProviderClass; if (LOG.isDebugEnabled()) { LOG.debug(message, e); } if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); } else { throw new IOException(message, e); } } // Check the port in the URI, if it is logical. if (checkPort && providerNN.useLogicalURI()) { int port = nameNodeUri.getPort(); if (port > 0 && port != NameNode.DEFAULT_PORT) { // Throwing here without any cleanup is fine since we have not // actually created the underlying proxies yet. throw new IOException("Port " + port + " specified in URI " + nameNodeUri + " but host '" + nameNodeUri.getHost() + "' is a logical (HA) namenode" + " and does not use port information."); } } providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth); return providerNN; }
/** * Generate a dummy namenode proxy instance that utilizes our hacked * {@link LossyRetryInvocationHandler}. Proxy instance generated using this * method will proactively drop RPC responses. Currently this method only * support HA setup. null will be returned if the given configuration is not * for HA. * * @param config the configuration containing the required IPC * properties, client failover configurations, etc. * @param nameNodeUri the URI pointing either to a specific NameNode * or to a logical nameservice. * @param xface the IPC interface which should be created * @param numResponseToDrop The number of responses to drop for each RPC call * @param fallbackToSimpleAuth set to true or false during calls to indicate * if a secure client falls back to simple auth * @return an object containing both the proxy and the associated * delegation token service it corresponds to. Will return null of the * given configuration does not support HA. * @throws IOException if there is an error creating the proxy */ public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler( Configuration config, URI nameNodeUri, Class<T> xface, int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth) throws IOException { Preconditions.checkArgument(numResponseToDrop > 0); AbstractNNFailoverProxyProvider<T> failoverProxyProvider = createFailoverProxyProvider(config, nameNodeUri, xface, true, fallbackToSimpleAuth); if (failoverProxyProvider != null) { // HA case int delay = config.getInt( HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY, HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT); int maxCap = config.getInt( HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY, HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT); int maxFailoverAttempts = config.getInt( HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT); int maxRetryAttempts = config.getInt( HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT); InvocationHandler dummyHandler = new LossyRetryInvocationHandler<>( numResponseToDrop, failoverProxyProvider, RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, Math.max(numResponseToDrop + 1, maxRetryAttempts), delay, maxCap)); @SuppressWarnings("unchecked") T proxy = (T) Proxy.newProxyInstance( failoverProxyProvider.getInterface().getClassLoader(), new Class[]{xface}, dummyHandler); Text dtService; if (failoverProxyProvider.useLogicalURI()) { dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri, HdfsConstants.HDFS_URI_SCHEME); } else { dtService = SecurityUtil.buildTokenService( DFSUtilClient.getNNAddress(nameNodeUri)); } return new ProxyAndInfo<>(proxy, dtService, DFSUtilClient.getNNAddress(nameNodeUri)); } else { LOG.warn("Currently creating proxy using " + "LossyRetryInvocationHandler requires NN HA setup"); return null; } }