private void testTokenAuth(Class<? extends RpcClient> rpcImplClass) throws IOException, ServiceException { TEST_UTIL.getConfiguration().set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcImplClass.getName()); try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); Table table = conn.getTable(TableName.META_TABLE_NAME)) { CoprocessorRpcChannel rpcChannel = table.coprocessorService(HConstants.EMPTY_START_ROW); AuthenticationProtos.AuthenticationService.BlockingInterface service = AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel); WhoAmIResponse response = service.whoAmI(null, WhoAmIRequest.getDefaultInstance()); assertEquals(USERNAME, response.getUsername()); assertEquals(AuthenticationMethod.TOKEN.name(), response.getAuthMethod()); try { service.getAuthenticationToken(null, GetAuthenticationTokenRequest.getDefaultInstance()); } catch (ServiceException e) { AccessDeniedException exc = (AccessDeniedException) ProtobufUtil.getRemoteException(e); assertTrue(exc.getMessage().contains( "Token generation only allowed for Kerberos authenticated clients")); } } }
public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId, User user) { this.conf = conf; this.user = user; this.connConf = new AsyncConnectionConfiguration(conf); this.registry = registry; this.rpcClient = RpcClientFactory.createClient(conf, clusterId); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); this.rpcTimeout = (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); this.locator = new AsyncRegionLocator(this, RETRY_TIMER); this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER); if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { nonceGenerator = PerClientRandomNonceGenerator.get(); } else { nonceGenerator = NO_NONCE_GENERATOR; } }
@Test public void testTokenAuthentication() throws Exception { UserGroupInformation testuser = UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"}); testuser.setAuthenticationMethod( UserGroupInformation.AuthenticationMethod.TOKEN); final Configuration conf = TEST_UTIL.getConfiguration(); UserGroupInformation.setConfiguration(conf); Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser"); LOG.debug("Got token: " + token.toString()); testuser.addToken(token); // verify the server authenticates us as this token user testuser.doAs(new PrivilegedExceptionAction<Object>() { public Object run() throws Exception { Configuration c = server.getConfiguration(); RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString()); ServerName sn = ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(), System.currentTimeMillis()); try { BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT); AuthenticationProtos.AuthenticationService.BlockingInterface stub = AuthenticationProtos.AuthenticationService.newBlockingStub(channel); AuthenticationProtos.WhoAmIResponse response = stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance()); String myname = response.getUsername(); assertEquals("testuser", myname); String authMethod = response.getAuthMethod(); assertEquals("TOKEN", authMethod); } finally { rpcClient.close(); } return null; } }); }
/** * @throws java.lang.Exception */ @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniCluster(SLAVES); // Set the custom RPC client with random timeouts as the client TEST_UTIL.getConfiguration().set( RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RandomTimeoutRpcClient.class.getName()); }
@Before public void setUp() throws Exception { Configuration conf = testUtil.getConfiguration(); conf.set(HConstants.MASTER_PORT, "0"); conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 2000); testUtil.startMiniZKCluster(); CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); ZooKeeperWatcher watcher = testUtil.getZooKeeperWatcher(); ZKUtil.createWithParents(watcher, watcher.getMasterAddressZNode(), Bytes.toBytes("fake:123")); master = new HMaster(conf, cp); rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT); }
/** * constructor * @param conf Configuration object * @param managed If true, does not do full shutdown on close; i.e. cleanup of connection * to zk and shutdown of all services; we just close down the resources this connection was * responsible for and decrement usage counters. It is up to the caller to do the full * cleanup. It is set when we want have connection sharing going on -- reuse of zk connection, * and cached region locations, established regionserver connections, etc. When connections * are shared, we have reference counting going on and will only do full cleanup when no more * users of an HConnectionImplementation instance. */ HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool, User user) throws IOException { this(conf); this.user = user; this.batchPool = pool; this.managed = managed; this.registry = setupRegistry(); retrieveClusterId(); this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); // Do we publish the status? boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class); if (shouldListen) { if (listenerClass == null) { LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); } else { clusterStatusListener = new ClusterStatusListener( new ClusterStatusListener.DeadServerHandler() { @Override public void newDead(ServerName sn) { clearCaches(sn); rpcClient.cancelConnections(sn); } }, conf, listenerClass); } } }
private void initializeThreads() throws IOException { // Cache flushing thread. this.cacheFlusher = new MemStoreFlusher(conf, this); // Compaction thread this.compactSplitThread = new CompactSplitThread(this); // Background thread to check for compactions; needed if region has not gotten updates // in a while. It will take care of not checking too frequently on store-by-store basis. this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this); this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this); this.leases = new Leases(this.threadWakeFrequency); // Create the thread to clean the moved regions list movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this); if (this.nonceManager != null) { // Create the chore that cleans up nonces. nonceManagerChore = this.nonceManager.createCleanupChore(this); } // Setup RPC client for master communication rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( rpcServices.isa.getAddress(), 0)); int storefileRefreshPeriod = conf.getInt( StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); if (storefileRefreshPeriod > 0) { this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this, this); } registerConfigurationObservers(); }
/** * constructor * * @param conf Configuration object * @param managed If true, does not do full shutdown on close; i.e. cleanup of connection * to zk and shutdown of all services; we just close down the resources this connection was * responsible for and decrement usage counters. It is up to the caller to do the full * cleanup. It is set when we want have connection sharing going on -- reuse of zk connection, * and cached region locations, established regionserver connections, etc. When connections * are shared, we have reference counting going on and will only do full cleanup when no more * users of an HConnectionImplementation instance. */ HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool, User user) throws IOException { this(conf); this.user = user; this.batchPool = pool;//==null ? this.managed = managed; this.registry = setupRegistry(); retrieveClusterId(); this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); // Do we publish the status? boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class); if (shouldListen) { if (listenerClass == null) { LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); } else { clusterStatusListener = new ClusterStatusListener( new ClusterStatusListener.DeadServerHandler() { @Override public void newDead(ServerName sn) { clearCaches(sn); rpcClient.cancelConnections(sn); } }, conf, listenerClass); } } }
/** * All initialization needed before we go register with Master.<br> * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br> * In here we just put up the RpcServer, setup Connection, and ZooKeeper. */ private void preRegistrationInitialization() { try { initializeZooKeeper(); setupClusterConnection(); // Setup RPC client for master communication this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); } catch (Throwable t) { // Call stop if error or process will stick around for ever since server // puts up non-daemon threads. this.rpcServices.stop(); abort("Initialization of RS failed. Hence aborting RS.", t); } }
@Before public void setUpTest() throws Exception { krbKeytab = getKeytabFileForTesting(); krbPrincipal = getPrincipalForTesting(); ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal); clientConf = getSecuredConfiguration(); clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl); serverConf = getSecuredConfiguration(); serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl); }
/** * Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from * the stub, this function will throw root cause of that exception. */ private void callRpcService(User clientUser) throws Exception { SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class); Mockito.when(securityInfoMock.getServerPrincipal()) .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL); SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock); InetSocketAddress isa = new InetSocketAddress(HOST, 0); RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)), isa, serverConf, new FifoRpcScheduler(serverConf, 1)); rpcServer.start(); try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) { BlockingInterface stub = newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser); TestThread th1 = new TestThread(stub); final Throwable exception[] = new Throwable[1]; Collections.synchronizedList(new ArrayList<Throwable>()); Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread th, Throwable ex) { exception[0] = ex; } }; th1.setUncaughtExceptionHandler(exceptionHandler); th1.start(); th1.join(); if (exception[0] != null) { // throw root cause. while (exception[0].getCause() != null) { exception[0] = exception[0].getCause(); } throw (Exception) exception[0]; } } finally { rpcServer.stop(); } }
@Before public void setUp() throws Exception { Configuration conf = testUtil.getConfiguration(); conf.set(HConstants.MASTER_PORT, "0"); conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 2000); testUtil.startMiniZKCluster(); ZKWatcher watcher = testUtil.getZooKeeperWatcher(); ZKUtil.createWithParents(watcher, watcher.znodePaths.masterAddressZNode, Bytes.toBytes("fake:123")); master = new HMaster(conf); rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT); }
private void initializeThreads() throws IOException { // Cache flushing thread. this.cacheFlusher = new MemStoreFlusher(conf, this); // Compaction thread this.compactSplitThread = new CompactSplitThread(this); // Background thread to check for compactions; needed if region has not gotten updates // in a while. It will take care of not checking too frequently on store-by-store basis. this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this); this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this); this.leases = new Leases(this.threadWakeFrequency); // Create the thread to clean the moved regions list movedRegionsCleaner = MovedRegionsCleaner.create(this); if (this.nonceManager != null) { // Create the scheduled chore that cleans up nonces. nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); } // Setup the Quota Manager rsQuotaManager = new RegionServerQuotaManager(this); // Setup RPC client for master communication rpcClient = RpcClientFactory .createClient(conf, clusterId, new InetSocketAddress(rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); boolean onlyMetaRefresh = false; int storefileRefreshPeriod = conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); if (storefileRefreshPeriod == 0) { storefileRefreshPeriod = conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); onlyMetaRefresh = true; } if (storefileRefreshPeriod > 0) { this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this); } registerConfigurationObservers(); }
private void callRpcService(Class<? extends RpcClient> rpcImplClass, User clientUser, Configuration clientConf, boolean allowInsecureFallback) throws Exception { Configuration clientConfCopy = new Configuration(clientConf); clientConfCopy.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcImplClass.getName()); Configuration conf = getSecuredConfiguration(); conf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, allowInsecureFallback); SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class); Mockito.when(securityInfoMock.getServerPrincipal()) .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL); SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock); InetSocketAddress isa = new InetSocketAddress(HOST, 0); RpcServerInterface rpcServer = new RpcServer(null, "AbstractTestSecureIPC", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), isa, conf, new FifoRpcScheduler(conf, 1)); rpcServer.start(); try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) { InetSocketAddress address = rpcServer.getListenerAddress(); if (address == null) { throw new IOException("Listener channel is closed"); } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()), clientUser, 5000); TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel); List<String> results = new ArrayList<String>(); TestThread th1 = new TestThread(stub, results); th1.start(); th1.join(); } finally { rpcServer.stop(); } }
/** * Test that a client that fails an RPC to the master retries properly and * doesn't throw any unexpected exceptions. * @throws Exception */ @Test public void testAdminTimeout() throws Exception { Connection lastConnection = null; boolean lastFailed = false; int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get(); RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()); try { for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) { lastFailed = false; // Ensure the HBaseAdmin uses a new connection by changing Configuration. Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); HBaseAdmin admin = null; try { admin = new HBaseAdmin(conf); Connection connection = admin.getConnection(); assertFalse(connection == lastConnection); lastConnection = connection; // run some admin commands HBaseAdmin.checkHBaseAvailable(conf); admin.setBalancerRunning(false, false); } catch (MasterNotRunningException ex) { // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get // a MasterNotRunningException. It's a bug if we get other exceptions. lastFailed = true; } finally { admin.close(); if (admin.getConnection().isClosed()) { rpcClient = (RandomTimeoutRpcClient) RpcClientFactory .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()); } } } // Ensure the RandomTimeoutRpcEngine is actually being used. assertFalse(lastFailed); assertTrue(RandomTimeoutBlockingRpcChannel.invokations.get() > initialInvocations); } finally { rpcClient.close(); } }
/** * To run this test, we must specify the following system properties: *<p> * <b> hbase.regionserver.kerberos.principal </b> * <p> * <b> hbase.regionserver.keytab.file </b> */ @Test public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception { assumeTrue(isKerberosPropertySetted()); String krbKeytab = getKeytabFileForTesting(); String krbPrincipal = getPrincipalForTesting(); Configuration cnf = new Configuration(); cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(cnf); UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab); UserGroupInformation ugi = UserGroupInformation.getLoginUser(); UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser(); // check that the login user is okay: assertSame(ugi, ugi2); assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod()); assertEquals(krbPrincipal, ugi.getUserName()); Configuration conf = getSecuredConfiguration(); SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class); Mockito.when(securityInfoMock.getServerPrincipal()) .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL); SecurityInfo.addInfo("TestDelayedService", securityInfoMock); boolean delayReturnValue = false; InetSocketAddress isa = new InetSocketAddress("localhost", 0); TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue); BlockingService service = TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance); rpcServer = new RpcServer(null, "testSecuredDelayedRpc", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), isa, conf, new FifoRpcScheduler(conf, 1)); rpcServer.start(); RpcClient rpcClient = RpcClientFactory .createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), User.getCurrent(), 1000); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); List<Integer> results = new ArrayList<Integer>(); TestThread th1 = new TestThread(stub, true, results); th1.start(); Thread.sleep(100); th1.join(); assertEquals(0xDEADBEEF, results.get(0).intValue()); } finally { rpcClient.close(); } }
@Test public void testRPCException() throws Exception { HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); TEST_UTIL.startMiniZKCluster(); Configuration conf = TEST_UTIL.getConfiguration(); conf.set(HConstants.MASTER_PORT, "0"); CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); HMaster hm = new HMaster(conf, cp); ServerName sm = hm.getServerName(); RpcClient rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT); try { int i = 0; //retry the RPC a few times; we have seen SocketTimeoutExceptions if we //try to connect too soon. Retry on SocketTimeoutException. while (i < 20) { try { BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sm, User.getCurrent(), 0); MasterProtos.MasterService.BlockingInterface stub = MasterProtos.MasterService.newBlockingStub(channel); stub.isMasterRunning(null, IsMasterRunningRequest.getDefaultInstance()); fail(); } catch (ServiceException ex) { IOException ie = ProtobufUtil.getRemoteException(ex); if (!(ie instanceof SocketTimeoutException)) { if (ie.getMessage().startsWith("org.apache.hadoop.hbase.ipc." + "ServerNotRunningYetException: Server is not running yet")) { // Done. Got the exception we wanted. System.out.println("Expected exception: " + ie.getMessage()); return; } else { throw ex; } } else { System.err.println("Got SocketTimeoutException. Will retry. "); } } catch (Throwable t) { fail("Unexpected throwable: " + t); } Thread.sleep(100); i++; } fail(); } finally { rpcClient.close(); } }
@Before public void setUpBeforeMethod() { TEST_UTIL.getConfiguration().set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl); }
/** * Test that a client that fails an RPC to the master retries properly and * doesn't throw any unexpected exceptions. * @throws Exception */ @Test public void testAdminTimeout() throws Exception { boolean lastFailed = false; int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get(); RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()); try { for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) { lastFailed = false; // Ensure the HBaseAdmin uses a new connection by changing Configuration. Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); Admin admin = null; Connection connection = null; try { connection = ConnectionFactory.createConnection(conf); admin = connection.getAdmin(); // run some admin commands HBaseAdmin.available(conf); admin.setBalancerRunning(false, false); } catch (MasterNotRunningException ex) { // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get // a MasterNotRunningException. It's a bug if we get other exceptions. lastFailed = true; } finally { if(admin != null) { admin.close(); if (admin.getConnection().isClosed()) { rpcClient = (RandomTimeoutRpcClient) RpcClientFactory .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()); } } if(connection != null) { connection.close(); } } } // Ensure the RandomTimeoutRpcEngine is actually being used. assertFalse(lastFailed); assertTrue(RandomTimeoutBlockingRpcChannel.invokations.get() > initialInvocations); } finally { rpcClient.close(); } }