/** Gets the configured Failover proxy provider's class */ @VisibleForTesting public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass( Configuration conf, URI nameNodeUri) throws IOException { if (nameNodeUri == null) { return null; } String host = nameNodeUri.getHost(); String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + host; try { @SuppressWarnings("unchecked") Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>) conf .getClass(configKey, null, FailoverProxyProvider.class); return ret; } catch (RuntimeException e) { if (e.getCause() instanceof ClassNotFoundException) { throw new IOException("Could not load failover proxy provider class " + conf.get(configKey) + " which is configured for authority " + nameNodeUri, e); } else { throw e; } } }
private DFSClient genClientWithDummyHandler() throws IOException { URI nnUri = dfs.getUri(); FailoverProxyProvider<ClientProtocol> failoverProxyProvider = NameNodeProxies.createFailoverProxyProvider(conf, nnUri, ClientProtocol.class, true, null); InvocationHandler dummyHandler = new DummyRetryInvocationHandler( failoverProxyProvider, RetryPolicies .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, Integer.MAX_VALUE, DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT, DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT)); ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance( failoverProxyProvider.getInterface().getClassLoader(), new Class[] { ClientProtocol.class }, dummyHandler); DFSClient client = new DFSClient(null, proxy, conf, null); return client; }
/** Gets the configured Failover proxy provider's class */ @VisibleForTesting public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass( Configuration conf, URI nameNodeUri) throws IOException { if (nameNodeUri == null) { return null; } String host = nameNodeUri.getHost(); String configKey = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + host; try { @SuppressWarnings("unchecked") Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>) conf.getClass(configKey, null, FailoverProxyProvider.class); return ret; } catch (RuntimeException e) { if (e.getCause() instanceof ClassNotFoundException) { throw new IOException("Could not load failover proxy provider class " + conf.get(configKey) + " which is configured for authority " + nameNodeUri, e); } else { throw e; } } }
private DFSClient genClientWithDummyHandler() throws IOException { URI nnUri = dfs.getUri(); FailoverProxyProvider<ClientProtocol> failoverProxyProvider = NameNodeProxiesClient.createFailoverProxyProvider(conf, nnUri, ClientProtocol.class, true, null); InvocationHandler dummyHandler = new DummyRetryInvocationHandler( failoverProxyProvider, RetryPolicies .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, Integer.MAX_VALUE, HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT, HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT)); ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance( failoverProxyProvider.getInterface().getClassLoader(), new Class[] { ClientProtocol.class }, dummyHandler); DFSClient client = new DFSClient(null, proxy, conf, null); return client; }
@SuppressWarnings("unchecked") private static <T> FailoverProxyProvider<T> createFailoverProxyProvider( Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass, Class<T> xface, String jtAddress) throws IOException { Preconditions.checkArgument( xface.isAssignableFrom(JTProtocols.class), "Interface %s is not a JobTracker protocol", xface); try { Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass .getConstructor(Configuration.class, String.class, Class.class); FailoverProxyProvider<?> provider = ctor.newInstance(conf, jtAddress, xface); return (FailoverProxyProvider<T>) provider; } catch (Exception e) { String message = "Couldn't create proxy provider " + failoverProxyProviderClass; if (LOG.isDebugEnabled()) { LOG.debug(message, e); } if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); } else { throw new IOException(message, e); } } }
/** * Creates the namenode proxy with the passed protocol. This will handle * creation of either HA- or non-HA-enabled proxy objects, depending upon * if the provided URI is a configured logical URI. * * @param conf the configuration containing the required IPC * properties, client failover configurations, etc. * @param nameNodeUri the URI pointing either to a specific NameNode * or to a logical nameservice. * @param xface the IPC interface which should be created * @return an object containing both the proxy and the associated * delegation token service it corresponds to * @throws IOException if there is an error creating the proxy **/ @SuppressWarnings("unchecked") public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException { Class<FailoverProxyProvider<T>> failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri, xface); if (failoverProxyProviderClass == null) { // Non-HA case return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface, UserGroupInformation.getCurrentUser(), true); } else { // HA case FailoverProxyProvider<T> failoverProxyProvider = NameNodeProxies .createFailoverProxyProvider(conf, failoverProxyProviderClass, xface, nameNodeUri); Conf config = new Conf(conf); T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, config.failoverSleepBaseMillis, config.failoverSleepMaxMillis)); Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri); return new ProxyAndInfo<T>(proxy, dtService); } }
/** Creates the Failover proxy provider instance*/ @VisibleForTesting public static <T> FailoverProxyProvider<T> createFailoverProxyProvider( Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass, Class<T> xface, URI nameNodeUri) throws IOException { Preconditions.checkArgument( xface.isAssignableFrom(NamenodeProtocols.class), "Interface %s is not a NameNode protocol", xface); try { Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass .getConstructor(Configuration.class, URI.class, Class.class); FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri, xface); return provider; } catch (Exception e) { String message = "Couldn't create proxy provider " + failoverProxyProviderClass; if (LOG.isDebugEnabled()) { LOG.debug(message, e); } if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); } else { throw new IOException(message, e); } } }
private DFSClient genClientWithDummyHandler() throws IOException { URI nnUri = dfs.getUri(); Class<FailoverProxyProvider<ClientProtocol>> failoverProxyProviderClass = NameNodeProxies.getFailoverProxyProviderClass(conf, nnUri, ClientProtocol.class); FailoverProxyProvider<ClientProtocol> failoverProxyProvider = NameNodeProxies.createFailoverProxyProvider(conf, failoverProxyProviderClass, ClientProtocol.class, nnUri); InvocationHandler dummyHandler = new DummyRetryInvocationHandler( failoverProxyProvider, RetryPolicies .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, Integer.MAX_VALUE, DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT, DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT)); ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance( failoverProxyProvider.getInterface().getClassLoader(), new Class[] { ClientProtocol.class }, dummyHandler); DFSClient client = new DFSClient(null, proxy, conf, null); return client; }
/** * Test that there is no retry when invalid token exception is thrown. * Verfies fix for HADOOP-12054 */ @Test(expected = InvalidToken.class) public void testNoRetryOnInvalidToken() throws IOException { final Client client = new Client(LongWritable.class, conf); final TestServer server = new TestServer(1, false); TestInvalidTokenHandler handler = new TestInvalidTokenHandler(client, server); DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance( DummyProtocol.class.getClassLoader(), new Class[] { DummyProtocol.class }, handler); FailoverProxyProvider<DummyProtocol> provider = new DefaultFailoverProxyProvider<DummyProtocol>( DummyProtocol.class, proxy); DummyProtocol retryProxy = (DummyProtocol) RetryProxy.create(DummyProtocol.class, provider, RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, 100, 100, 10000, 0)); try { server.start(); retryProxy.dummyRun(); } finally { // Check if dummyRun called only once Assert.assertEquals(handler.invocations, 1); Client.setCallIdAndRetryCount(0, 0); client.stop(); server.stop(); } }
@SuppressWarnings("unchecked") public static <T> ProxyAndInfo<T> createProxy(Configuration conf, String jtAddress, Class<T> xface) throws IOException { Class<FailoverProxyProvider<T>> failoverProxyProviderClass = getFailoverProxyProviderClass(conf, jtAddress, xface); if (failoverProxyProviderClass == null) { // Non-HA case return createNonHAProxy(conf, NetUtils.createSocketAddr(jtAddress), xface, UserGroupInformation.getCurrentUser(), true); } else { // HA case FailoverProxyProvider<T> failoverProxyProvider = createFailoverProxyProvider(conf, failoverProxyProviderClass, xface, jtAddress); int maxFailoverAttempts = conf.getInt(HAUtil.MR_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, HAUtil.MR_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); long failoverSleepBaseMillis = conf.getInt(HAUtil.MR_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY, HAUtil.MR_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT); long failoverSleepMaxMillis = conf.getInt(HAUtil.MR_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY, HAUtil.MR_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT); T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, failoverSleepBaseMillis, failoverSleepMaxMillis)); Text dtService = HAUtil.buildTokenServiceForLogicalAddress(jtAddress); return new ProxyAndInfo<T>(proxy, dtService); } }
private static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass( Configuration conf, String jtAddress, Class<T> xface) throws IOException { if (jtAddress == null) { return null; } String configKey = DFSUtil.addKeySuffixes( HAUtil.MR_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX, HAUtil.getLogicalName(jtAddress)); return (Class<FailoverProxyProvider<T>>) conf.getClass(configKey, null, FailoverProxyProvider.class); }
/** Gets the configured Failover proxy provider's class */ @VisibleForTesting public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass( Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException { if (nameNodeUri == null) { return null; } String host = nameNodeUri.getHost(); String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + host; try { @SuppressWarnings("unchecked") Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>) conf .getClass(configKey, null, FailoverProxyProvider.class); if (ret != null) { // If we found a proxy provider, then this URI should be a logical NN. // Given that, it shouldn't have a non-default port number. int port = nameNodeUri.getPort(); if (port > 0 && port != NameNode.DEFAULT_PORT) { throw new IOException("Port " + port + " specified in URI " + nameNodeUri + " but host '" + host + "' is a logical (HA) namenode" + " and does not use port information."); } } return ret; } catch (RuntimeException e) { if (e.getCause() instanceof ClassNotFoundException) { throw new IOException("Could not load failover proxy provider class " + conf.get(configKey) + " which is configured for authority " + nameNodeUri, e); } else { throw e; } } }
/** * Gets the configured Failover proxy provider's class */ private static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass( Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException { if (nameNodeUri == null) { return null; } String host = nameNodeUri.getHost(); String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + host; try { @SuppressWarnings("unchecked") Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>) conf .getClass(configKey, null, FailoverProxyProvider.class); if (ret != null) { // If we found a proxy provider, then this URI should be a logical NN. // Given that, it shouldn't have a non-default port number. int port = nameNodeUri.getPort(); if (port > 0 && port != NameNode.DEFAULT_PORT) { throw new IOException( "Port " + port + " specified in URI " + nameNodeUri + " but host '" + host + "' is a logical (HA) namenode" + " and does not use port information."); } } return ret; } catch (RuntimeException e) { if (e.getCause() instanceof ClassNotFoundException) { throw new IOException("Could not load failover proxy provider class " + conf.get(configKey) + " which is configured for authority " + nameNodeUri, e); } else { throw e; } } }
/** * Creates the Failover proxy provider instance */ @SuppressWarnings("unchecked") private static <T> FailoverProxyProvider<T> createFailoverProxyProvider( Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass, Class<T> xface, URI nameNodeUri) throws IOException { Preconditions.checkArgument(xface.isAssignableFrom(NamenodeProtocols.class), "Interface %s is not a NameNode protocol", xface); try { Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass .getConstructor(Configuration.class, URI.class, Class.class); FailoverProxyProvider<?> provider = ctor.newInstance(conf, nameNodeUri, xface); return (FailoverProxyProvider<T>) provider; } catch (Exception e) { String message = "Couldn't create proxy provider " + failoverProxyProviderClass; if (LOG.isDebugEnabled()) { LOG.debug(message, e); } if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); } else { throw new IOException(message, e); } } }
/** * Test that there is no retry when invalid token exception is thrown. * Verfies fix for HADOOP-12054 */ @Test(expected = InvalidToken.class) public void testNoRetryOnInvalidToken() throws IOException { final Client client = new Client(LongWritable.class, conf); final TestServer server = new TestServer(1, false); TestInvalidTokenHandler handler = new TestInvalidTokenHandler(client, server); DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance( DummyProtocol.class.getClassLoader(), new Class[] { DummyProtocol.class }, handler); FailoverProxyProvider<DummyProtocol> provider = new DefaultFailoverProxyProvider<DummyProtocol>( DummyProtocol.class, proxy); DummyProtocol retryProxy = (DummyProtocol) RetryProxy.create(DummyProtocol.class, provider, RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, 100, 100, 10000, 0)); try { server.start(); retryProxy.dummyRun(); } finally { // Check if dummyRun called only once Assert.assertEquals(handler.invocations, 1); Client.setCallIdAndRetryCount(0, 0, null); client.stop(); server.stop(); } }
/** * Generate a dummy namenode proxy instance that utilizes our hacked * {@link LossyRetryInvocationHandler}. Proxy instance generated using this * method will proactively drop RPC responses. Currently this method only * support HA setup. null will be returned if the given configuration is not * for HA. * * @param config the configuration containing the required IPC * properties, client failover configurations, etc. * @param nameNodeUri the URI pointing either to a specific NameNode * or to a logical nameservice. * @param xface the IPC interface which should be created * @param numResponseToDrop The number of responses to drop for each RPC call * @return an object containing both the proxy and the associated * delegation token service it corresponds to. Will return null of the * given configuration does not support HA. * @throws IOException if there is an error creating the proxy */ @SuppressWarnings("unchecked") public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler( Configuration config, URI nameNodeUri, Class<T> xface, int numResponseToDrop) throws IOException { Preconditions.checkArgument(numResponseToDrop > 0); Class<FailoverProxyProvider<T>> failoverProxyProviderClass = getFailoverProxyProviderClass(config, nameNodeUri, xface); if (failoverProxyProviderClass != null) { // HA case FailoverProxyProvider<T> failoverProxyProvider = createFailoverProxyProvider(config, failoverProxyProviderClass, xface, nameNodeUri); int delay = config.getInt( DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY, DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT); int maxCap = config.getInt( DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY, DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT); int maxFailoverAttempts = config.getInt( DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); InvocationHandler dummyHandler = new LossyRetryInvocationHandler<T>( numResponseToDrop, failoverProxyProvider, RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, Math.max(numResponseToDrop + 1, maxFailoverAttempts), delay, maxCap)); T proxy = (T) Proxy.newProxyInstance( failoverProxyProvider.getInterface().getClassLoader(), new Class[] { xface }, dummyHandler); Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri); return new ProxyAndInfo<T>(proxy, dtService); } else { LOG.warn("Currently creating proxy using " + "LossyRetryInvocationHandler requires NN HA setup"); return null; } }
/** * Creates the namenode proxy with the passed protocol. This will handle * creation of either HA- or non-HA-enabled proxy objects, depending upon * if the provided URI is a configured logical URI. * * @param conf the configuration containing the required IPC * properties, client failover configurations, etc. * @param nameNodeUri the URI pointing either to a specific NameNode * or to a logical nameservice. * @param xface the IPC interface which should be created * @return an object containing both the proxy and the associated * delegation token service it corresponds to * @throws IOException if there is an error creating the proxy **/ @SuppressWarnings("unchecked") public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException { Class<FailoverProxyProvider<T>> failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri, xface); if (failoverProxyProviderClass == null) { // Non-HA case return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface, UserGroupInformation.getCurrentUser(), true); } else { // HA case FailoverProxyProvider<T> failoverProxyProvider = NameNodeProxies .createFailoverProxyProvider(conf, failoverProxyProviderClass, xface, nameNodeUri); Conf config = new Conf(conf); T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, config.maxRetryAttempts, config.failoverSleepBaseMillis, config.failoverSleepMaxMillis)); Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri); return new ProxyAndInfo<T>(proxy, dtService); } }
/** Creates the Failover proxy provider instance*/ @VisibleForTesting public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider( Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort, AtomicBoolean fallbackToSimpleAuth) throws IOException { Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null; AbstractNNFailoverProxyProvider<T> providerNN; Preconditions.checkArgument( xface.isAssignableFrom(NamenodeProtocols.class), "Interface %s is not a NameNode protocol", xface); try { // Obtain the class of the proxy provider failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri); if (failoverProxyProviderClass == null) { return null; } // Create a proxy provider instance. Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass .getConstructor(Configuration.class, URI.class, Class.class); FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri, xface); // If the proxy provider is of an old implementation, wrap it. if (!(provider instanceof AbstractNNFailoverProxyProvider)) { providerNN = new WrappedFailoverProxyProvider<T>(provider); } else { providerNN = (AbstractNNFailoverProxyProvider<T>)provider; } } catch (Exception e) { String message = "Couldn't create proxy provider " + failoverProxyProviderClass; if (LOG.isDebugEnabled()) { LOG.debug(message, e); } if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); } else { throw new IOException(message, e); } } // Check the port in the URI, if it is logical. if (checkPort && providerNN.useLogicalURI()) { int port = nameNodeUri.getPort(); if (port > 0 && port != NameNode.DEFAULT_PORT) { // Throwing here without any cleanup is fine since we have not // actually created the underlying proxies yet. throw new IOException("Port " + port + " specified in URI " + nameNodeUri + " but host '" + nameNodeUri.getHost() + "' is a logical (HA) namenode" + " and does not use port information."); } } providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth); return providerNN; }