/** * Constructs a new assignment manager. * * @param master * @param serverManager * @param catalogTracker * @param service * @throws KeeperException * @throws IOException */ public AssignmentManager(Server master, ServerManager serverManager, CatalogTracker catalogTracker, final LoadBalancer balancer, final ExecutorService service) throws KeeperException, IOException { super(master.getZooKeeper()); this.master = master; this.serverManager = serverManager; this.catalogTracker = catalogTracker; this.executorService = service; this.regionsToReopen = Collections.synchronizedMap (new HashMap<String, HRegionInfo> ()); Configuration conf = master.getConfiguration(); this.timeoutMonitor = new TimeoutMonitor( conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000), master, serverManager, conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1800000)); this.timerUpdater = new TimerUpdater(conf.getInt( "hbase.master.assignment.timerupdater.period", 10000), master); Threads.setDaemonThreadRunning(timerUpdater.getThread(), master.getServerName() + ".timerUpdater"); this.zkTable = new ZKTable(this.master.getZooKeeper()); this.maximumAssignmentAttempts = this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10); this.balancer = balancer; this.threadPoolExecutorService = Executors.newCachedThreadPool(); }
/** * Recover the tables that are not fully moved to ENABLED state. These tables * are in ENABLING state when the master restarted/switched * * @throws KeeperException * @throws org.apache.hadoop.hbase.TableNotFoundException * @throws IOException */ private void recoverTableInEnablingState() throws KeeperException, TableNotFoundException, IOException { Set<TableName> enablingTables = ZKTable.getEnablingTables(watcher); if (enablingTables.size() != 0) { for (TableName tableName : enablingTables) { // Recover by calling EnableTableHandler LOG.info("The table " + tableName + " is in ENABLING state. Hence recovering by moving the table" + " to ENABLED state."); // enableTable in sync way during master startup, // no need to invoke coprocessor EnableTableHandler eth = new EnableTableHandler(this.server, tableName, catalogTracker, this, tableLockManager, true); try { eth.prepare(); } catch (TableNotFoundException e) { LOG.warn("Table " + tableName + " not found in hbase:meta to recover."); continue; } eth.process(); } } }
private boolean waitUntilTableDisabled(long timeout, TableName tableName, ZKTable zk) { long startTime = System.currentTimeMillis(); long remaining = timeout; boolean disabled = false; while (!(disabled = zk.isDisabledTable(tableName)) && remaining > 0) { try { Thread.sleep(100); } catch (InterruptedException e) { if (LOG.isDebugEnabled()) { LOG.debug("Interrupted while waiting for table" + tableName + " set to DISABLED."); } } remaining = timeout - (System.currentTimeMillis() - startTime); } if (remaining <= 0) { return disabled; } else { return true; } }
private boolean waitUntilTableEnabled(long timeout, TableName tableName, ZKTable zk) { long startTime = System.currentTimeMillis(); long remaining = timeout; boolean enabled = false; while (!(enabled = zk.isEnabledTable(tableName)) && remaining > 0) { try { Thread.sleep(100); } catch (InterruptedException e) { if (LOG.isDebugEnabled()) { LOG.debug("Interrupted while waiting for table " + tableName + "state set to ENABLED."); } } remaining = timeout - (System.currentTimeMillis() - startTime); } if (remaining <= 0) { return enabled; } else { return true; } }
private boolean testTableOnlineState(byte [] tableName, boolean online) throws IOException { if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { // The root region is always enabled return online; } String tableNameStr = Bytes.toString(tableName); try { if (online) { return ZKTable.isEnabledTable(this.zooKeeper, tableNameStr); } return ZKTable.isDisabledTable(this.zooKeeper, tableNameStr); } catch (KeeperException e) { throw new IOException("Enable/Disable failed", e); } }
/** * Constructs a new assignment manager. * * @param master * @param serverManager * @param catalogTracker * @param service * @throws KeeperException * @throws IOException */ public AssignmentManager(Server master, ServerManager serverManager, CatalogTracker catalogTracker, final ExecutorService service) throws KeeperException, IOException { super(master.getZooKeeper()); this.master = master; this.serverManager = serverManager; this.catalogTracker = catalogTracker; this.executorService = service; this.regionsToReopen = Collections.synchronizedMap (new HashMap<String, HRegionInfo> ()); Configuration conf = master.getConfiguration(); this.timeoutMonitor = new TimeoutMonitor( conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000), master, serverManager, conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1800000)); Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), master.getServerName() + ".timeoutMonitor"); this.zkTable = new ZKTable(this.master.getZooKeeper()); this.maximumAssignmentAttempts = this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10); this.balancer = LoadBalancerFactory.getLoadBalancer(conf); this.threadPoolExecutorService = Executors.newCachedThreadPool(); }
/** * Load the list of disabled tables in ZK into local set. * @throws ZooKeeperConnectionException * @throws IOException */ private void loadDisabledTables() throws ZooKeeperConnectionException, IOException { HConnectionManager.execute(new HConnectable<Void>(conf) { @Override public Void connect(HConnection connection) throws IOException { ZooKeeperWatcher zkw = connection.getZooKeeperWatcher(); try { for (String tableName : ZKTable.getDisabledOrDisablingTables(zkw)) { disabledTables.add(Bytes.toBytes(tableName)); } } catch (KeeperException ke) { throw new IOException(ke); } return null; } }); }
/** * Recover the tables that are not fully moved to ENABLED state. These tables * are in ENABLING state when the master restarted/switched * * @throws KeeperException * @throws org.apache.hadoop.hbase.TableNotFoundException * @throws IOException */ private void recoverTableInEnablingState() throws KeeperException, TableNotFoundException, IOException { Set<TableName> enablingTables = ZKTable.getEnablingTables(watcher); if (enablingTables.size() != 0) { for (TableName tableName : enablingTables) { // Recover by calling EnableTableHandler LOG.info("The table " + tableName + " is in ENABLING state. Hence recovering by moving the table" + " to ENABLED state."); // enableTable in sync way during master startup, // no need to invoke coprocessor new EnableTableHandler(this.server, tableName, catalogTracker, this, tableLockManager, true).prepare().process(); } } }
/** * Recover the tables that are not fully moved to ENABLED state. These tables * are in ENABLING state when the master restarted/switched * * @throws KeeperException * @throws TableNotFoundException * @throws IOException */ private void recoverTableInEnablingState() throws KeeperException, TableNotFoundException, IOException { Set<String> enablingTables = ZKTable.getEnablingTables(watcher); if (enablingTables.size() != 0) { for (String tableName : enablingTables) { // Recover by calling EnableTableHandler LOG.info("The table " + tableName + " is in ENABLING state. Hence recovering by moving the table" + " to ENABLED state."); // enableTable in sync way during master startup, // no need to invoke coprocessor new EnableTableHandler(this.server, tableName.getBytes(), catalogTracker, this, true).process(); } } }
private boolean waitUntilTableDisabled(long timeout, String tableName, ZKTable zk) { long startTime = System.currentTimeMillis(); long remaining = timeout; boolean disabled = false; while (!(disabled = zk.isDisabledTable(tableName)) && remaining > 0) { try { Thread.sleep(100); } catch (InterruptedException e) { if (LOG.isDebugEnabled()) { LOG.debug("Interrupted while waiting for table" + tableName + " set to DISABLED."); } } remaining = timeout - (System.currentTimeMillis() - startTime); } if (remaining <= 0) { return disabled; } else { return true; } }
private boolean waitUntilTableEnabled(long timeout, String tableName, ZKTable zk) { long startTime = System.currentTimeMillis(); long remaining = timeout; boolean enabled = false; while (!(enabled = zk.isEnabledTable(tableName)) && remaining > 0) { try { Thread.sleep(100); } catch (InterruptedException e) { if (LOG.isDebugEnabled()) { LOG.debug("Interrupted while waiting for table " + tableName + "state set to ENABLED."); } } remaining = timeout - (System.currentTimeMillis() - startTime); } if (remaining <= 0) { return enabled; } else { return true; } }
/** * Assigns all user regions, if any exist. Used during cluster startup. * <p> * This is a synchronous call and will return once every region has been * assigned. If anything fails, an exception is thrown and the cluster * should be shutdown. * @throws InterruptedException * @throws IOException * @throws KeeperException */ private void assignAllUserRegions() throws IOException, InterruptedException, KeeperException { // Cleanup any existing ZK nodes and start watching ZKAssign.deleteAllNodes(watcher); ZKUtil.listChildrenAndWatchForNewChildren(this.watcher, this.watcher.assignmentZNode); failoverCleanupDone(); // Skip assignment for regions of tables in DISABLING state because during clean cluster startup // no RS is alive and regions map also doesn't have any information about the regions. // See HBASE-6281. Set<TableName> disabledOrDisablingOrEnabling = ZKTable.getDisabledOrDisablingTables(watcher); disabledOrDisablingOrEnabling.addAll(ZKTable.getEnablingTables(watcher)); // Scan hbase:meta for all user regions, skipping any disabled tables Map<HRegionInfo, ServerName> allRegions; SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment = new SnapshotOfRegionAssignmentFromMeta(catalogTracker, disabledOrDisablingOrEnabling, true); snapshotOfRegionAssignment.initialize(); allRegions = snapshotOfRegionAssignment.getRegionToRegionServerMap(); if (allRegions == null || allRegions.isEmpty()) return; // Determine what type of assignment to do on startup boolean retainAssignment = server.getConfiguration(). getBoolean("hbase.master.startup.retainassign", true); if (retainAssignment) { assign(allRegions); } else { List<HRegionInfo> regions = new ArrayList<HRegionInfo>(allRegions.keySet()); assign(regions); } for (HRegionInfo hri : allRegions.keySet()) { TableName tableName = hri.getTable(); if (!zkTable.isEnabledTable(tableName)) { setEnabledTable(tableName); } } }
/** * Recover the tables that were not fully moved to DISABLED state. These * tables are in DISABLING state when the master restarted/switched. * * @throws KeeperException * @throws TableNotFoundException * @throws IOException */ private void recoverTableInDisablingState() throws KeeperException, TableNotFoundException, IOException { Set<TableName> disablingTables = ZKTable.getDisablingTables(watcher); if (disablingTables.size() != 0) { for (TableName tableName : disablingTables) { // Recover by calling DisableTableHandler LOG.info("The table " + tableName + " is in DISABLING state. Hence recovering by moving the table" + " to DISABLED state."); new DisableTableHandler(this.server, tableName, catalogTracker, this, tableLockManager, true).prepare().process(); } } }
private void setTablesInZK() throws IOException, KeeperException { if (tablesToBeSetInZK != null && !tablesToBeSetInZK.isEmpty()) { ZKTable zkTable = new ZKTable(this.watcher); for (Pair<String, State> p : tablesToBeSetInZK) { setStateInZK(zkTable, p.getFirst(), p.getSecond()); } } }
private void setStateInZK(ZKTable zkTable, String tableName, State state) throws IOException, KeeperException { if (state == State.ENABLED) { zkTable.setEnabledTable(TableName.valueOf(tableName)); } if (state == State.DISABLED) { zkTable.setDisabledTable(TableName.valueOf(tableName)); } }
/** * Constructs a new assignment manager. * * @param server * @param serverManager * @param catalogTracker * @param service * @throws KeeperException * @throws IOException */ public AssignmentManager(Server server, ServerManager serverManager, CatalogTracker catalogTracker, final LoadBalancer balancer, final ExecutorService service, MetricsMaster metricsMaster) throws KeeperException, IOException { super(server.getZooKeeper()); this.server = server; this.serverManager = serverManager; this.catalogTracker = catalogTracker; this.executorService = service; this.regionsToReopen = Collections.synchronizedMap (new HashMap<String, HRegionInfo> ()); Configuration conf = server.getConfiguration(); this.timeoutMonitor = new TimeoutMonitor( conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000), server, serverManager, conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 600000)); this.timerUpdater = new TimerUpdater(conf.getInt( "hbase.master.assignment.timerupdater.period", 10000), server); Threads.setDaemonThreadRunning(timerUpdater.getThread(), server.getServerName() + ".timerUpdater"); this.zkTable = new ZKTable(this.watcher); this.maximumAttempts = this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10); this.balancer = balancer; int maxThreads = conf.getInt("hbase.assignment.threads.max", 30); this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool( maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("hbase-am")); this.metricsMaster = metricsMaster;// can be null only with tests. this.regionStates = new RegionStates(server, serverManager); int workers = conf.getInt("hbase.assignment.zkevent.workers", 20); ThreadFactory threadFactory = Threads.newDaemonThreadFactory("hbase-am-zkevent-worker"); zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L, TimeUnit.SECONDS, threadFactory); }
/** * Assigns all user regions, if any exist. Used during cluster startup. * <p> * This is a synchronous call and will return once every region has been * assigned. If anything fails, an exception is thrown and the cluster * should be shutdown. * @throws InterruptedException * @throws IOException * @throws KeeperException */ private void assignAllUserRegions() throws IOException, InterruptedException, KeeperException { // Cleanup any existing ZK nodes and start watching ZKAssign.deleteAllNodes(watcher); ZKUtil.listChildrenAndWatchForNewChildren(this.watcher, this.watcher.assignmentZNode); failoverCleanupDone(); // Skip assignment for regions of tables in DISABLING state because during clean cluster startup // no RS is alive and regions map also doesn't have any information about the regions. // See HBASE-6281. Set<String> disabledOrDisablingOrEnabling = ZKTable.getDisabledOrDisablingTables(watcher); disabledOrDisablingOrEnabling.addAll(ZKTable.getEnablingTables(watcher)); // Scan META for all user regions, skipping any disabled tables Map<HRegionInfo, ServerName> allRegions = MetaReader.fullScan( catalogTracker, disabledOrDisablingOrEnabling, true); if (allRegions == null || allRegions.isEmpty()) return; // Determine what type of assignment to do on startup boolean retainAssignment = server.getConfiguration(). getBoolean("hbase.master.startup.retainassign", true); if (retainAssignment) { assign(allRegions); } else { List<HRegionInfo> regions = new ArrayList<HRegionInfo>(allRegions.keySet()); assign(regions); } for (HRegionInfo hri : allRegions.keySet()) { String tableName = hri.getTableNameAsString(); if (!zkTable.isEnabledTable(tableName)) { setEnabledTable(tableName); } } }
/** * Recover the tables that were not fully moved to DISABLED state. These * tables are in DISABLING state when the master restarted/switched. * * @throws KeeperException * @throws TableNotFoundException * @throws IOException */ private void recoverTableInDisablingState() throws KeeperException, TableNotFoundException, IOException { Set<String> disablingTables = ZKTable.getDisablingTables(watcher); if (disablingTables.size() != 0) { for (String tableName : disablingTables) { // Recover by calling DisableTableHandler LOG.info("The table " + tableName + " is in DISABLING state. Hence recovering by moving the table" + " to DISABLED state."); new DisableTableHandler(this.server, tableName.getBytes(), catalogTracker, this, true).process(); } } }
/** * @return Instance of ZKTable. */ public ZKTable getZKTable() { // These are 'expensive' to make involving trip to zk ensemble so allow // sharing. return this.zkTable; }
@Test public void testShouldNotCompeleteOpenedRegionSuccessfullyIfVersionMismatches() throws Exception { HRegion region = null; try { int testIndex = 0; TEST_UTIL.startMiniZKCluster(); final Server server = new MockServer(TEST_UTIL); HTableDescriptor htd = new HTableDescriptor( "testShouldNotCompeleteOpenedRegionSuccessfullyIfVersionMismatches"); HRegionInfo hri = new HRegionInfo(htd.getName(), Bytes.toBytes(testIndex), Bytes.toBytes(testIndex + 1)); region = HRegion.createHRegion(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL.getConfiguration(), htd); assertNotNull(region); AssignmentManager am = Mockito.mock(AssignmentManager.class); when(am.isRegionInTransition(hri)).thenReturn( new RegionState(region.getRegionInfo(), RegionState.State.OPEN, System.currentTimeMillis(), server.getServerName())); // create a node with OPENED state zkw = HBaseTestingUtility.createAndForceNodeToOpenedState(TEST_UTIL, region, server.getServerName()); when(am.getZKTable()).thenReturn(new ZKTable(zkw)); Stat stat = new Stat(); String nodeName = ZKAssign.getNodeName(zkw, region.getRegionInfo() .getEncodedName()); ZKUtil.getDataAndWatch(zkw, nodeName, stat); // use the version for the OpenedRegionHandler OpenedRegionHandler handler = new OpenedRegionHandler(server, am, region .getRegionInfo(), server.getServerName(), stat.getVersion()); // Once again overwrite the same znode so that the version changes. ZKAssign.transitionNode(zkw, region.getRegionInfo(), server .getServerName(), EventType.RS_ZK_REGION_OPENED, EventType.RS_ZK_REGION_OPENED, stat.getVersion()); // Should not invoke assignmentmanager.regionOnline. If it is // invoked as per current mocking it will throw null pointer exception. boolean expectedException = false; try { handler.process(); } catch (Exception e) { expectedException = true; } assertFalse("The process method should not throw any exception.", expectedException); List<String> znodes = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.assignmentZNode); String regionName = znodes.get(0); assertEquals("The region should not be opened successfully.", regionName, region.getRegionInfo().getEncodedName()); } finally { region.close(); region.getLog().closeAndDelete(); TEST_UTIL.shutdownMiniZKCluster(); } }
@Test (timeout=180000) public void testMasterFailoverWhenDisablingTableRegionsInRITOnDeadRS() throws Exception { MiniHBaseCluster cluster = TESTUTIL.getHBaseCluster(); HMaster master = cluster.getMaster(); // disable load balancing on this master master.balanceSwitch(false); final String table = "testMasterFailoverWhenDisablingTableRegionsInRITOnDeadRS"; byte [] FAMILY = Bytes.toBytes("family"); byte[][] SPLIT_KEYS = new byte[][] {Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"), Bytes.toBytes("d") }; HTableDescriptor htd = new HTableDescriptor(table); HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); htd.addFamily(hcd); TESTUTIL.getHBaseAdmin().createTable(htd, SPLIT_KEYS); AssignmentManager am = cluster.getMaster().getAssignmentManager(); List<HRegionInfo> regionsOfTable = null; while ((regionsOfTable = am.getRegionsOfTable(table.getBytes())).size() != (SPLIT_KEYS.length + 1)) { Thread.sleep(10); } HRegionInfo closingRegion = regionsOfTable.get(0); ServerName serverName = am.getRegionServerOfRegion(closingRegion); HRegionServer deadRS = null; for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { deadRS = cluster.getRegionServer(i); if (deadRS.getServerName().equals(serverName)) { break; } } // Disable the table in ZK ZKTable zkTable = am.getZKTable(); zkTable.setDisablingTable(table); ZKAssign.createNodeClosing(master.getZooKeeper(), closingRegion, serverName); // Stop the master abortMaster(cluster); master = startMasterAndWaitTillMetaRegionAssignment(cluster); deadRS.kill(); deadRS.join(); waitUntilMasterIsInitialized(master); am = cluster.getMaster().getAssignmentManager(); zkTable = am.getZKTable(); // wait for no more RIT ZKAssign.blockUntilNoRIT(master.getZooKeeper()); while (!master.getAssignmentManager().getZKTable().isDisabledTable(table)) { Thread.sleep(10); } assertTrue("Table should be disabled state.", zkTable.isDisabledTable(table)); HBaseAdmin admin = new HBaseAdmin(master.getConfiguration()); admin.deleteTable(table); }
/** * Constructs a new assignment manager. * * @param server * @param serverManager * @param catalogTracker * @param service * @throws KeeperException * @throws IOException */ public AssignmentManager(Server server, ServerManager serverManager, CatalogTracker catalogTracker, final LoadBalancer balancer, final ExecutorService service, MetricsMaster metricsMaster, final TableLockManager tableLockManager) throws KeeperException, IOException { super(server.getZooKeeper()); this.server = server; this.serverManager = serverManager; this.catalogTracker = catalogTracker; this.executorService = service; this.regionsToReopen = Collections.synchronizedMap (new HashMap<String, HRegionInfo> ()); Configuration conf = server.getConfiguration(); // Only read favored nodes if using the favored nodes load balancer. this.shouldAssignRegionsWithFavoredNodes = conf.getClass( HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals( FavoredNodeLoadBalancer.class); this.tomActivated = conf.getBoolean( ASSIGNMENT_TIMEOUT_MANAGEMENT, DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT); if (tomActivated){ this.serversInUpdatingTimer = new ConcurrentSkipListSet<ServerName>(); this.timeoutMonitor = new TimeoutMonitor( conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000), server, serverManager, conf.getInt(ASSIGNMENT_TIMEOUT, DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT)); this.timerUpdater = new TimerUpdater(conf.getInt( "hbase.master.assignment.timerupdater.period", 10000), server); Threads.setDaemonThreadRunning(timerUpdater.getThread(), server.getServerName() + ".timerUpdater"); } else { this.serversInUpdatingTimer = null; this.timeoutMonitor = null; this.timerUpdater = null; } this.zkTable = new ZKTable(this.watcher); // This is the max attempts, not retries, so it should be at least 1. this.maximumAttempts = Math.max(1, this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10)); this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong( "hbase.meta.assignment.retry.sleeptime", 1000l); this.balancer = balancer; int maxThreads = conf.getInt("hbase.assignment.threads.max", 30); this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool( maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM.")); this.regionStates = new RegionStates(server, serverManager); this.bulkAssignWaitTillAllAssigned = conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false); this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7); this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3); int workers = conf.getInt("hbase.assignment.zkevent.workers", 20); ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker"); zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L, TimeUnit.SECONDS, threadFactory); this.tableLockManager = tableLockManager; this.metricsAssignmentManager = new MetricsAssignmentManager(); }
@Test public void testShouldNotCompeleteOpenedRegionSuccessfullyIfVersionMismatches() throws Exception { HRegion region = null; try { int testIndex = 0; TEST_UTIL.startMiniZKCluster(); final Server server = new MockServer(TEST_UTIL); HTableDescriptor htd = new HTableDescriptor( TableName.valueOf("testShouldNotCompeleteOpenedRegionSuccessfullyIfVersionMismatches")); HRegionInfo hri = new HRegionInfo(htd.getTableName(), Bytes.toBytes(testIndex), Bytes.toBytes(testIndex + 1)); region = HRegion.createHRegion(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL.getConfiguration(), htd); assertNotNull(region); AssignmentManager am = Mockito.mock(AssignmentManager.class); RegionStates rsm = Mockito.mock(RegionStates.class); Mockito.doReturn(rsm).when(am).getRegionStates(); when(rsm.isRegionInTransition(hri)).thenReturn(false); when(rsm.getRegionState(hri)).thenReturn( new RegionState(region.getRegionInfo(), RegionState.State.OPEN, System.currentTimeMillis(), server.getServerName())); // create a node with OPENED state zkw = HBaseTestingUtility.createAndForceNodeToOpenedState(TEST_UTIL, region, server.getServerName()); when(am.getZKTable()).thenReturn(new ZKTable(zkw)); Stat stat = new Stat(); String nodeName = ZKAssign.getNodeName(zkw, region.getRegionInfo() .getEncodedName()); ZKUtil.getDataAndWatch(zkw, nodeName, stat); // use the version for the OpenedRegionHandler OpenedRegionHandler handler = new OpenedRegionHandler(server, am, region .getRegionInfo(), server.getServerName(), stat.getVersion()); // Once again overwrite the same znode so that the version changes. ZKAssign.transitionNode(zkw, region.getRegionInfo(), server .getServerName(), EventType.RS_ZK_REGION_OPENED, EventType.RS_ZK_REGION_OPENED, stat.getVersion()); // Should not invoke assignmentmanager.regionOnline. If it is // invoked as per current mocking it will throw null pointer exception. boolean expectedException = false; try { handler.process(); } catch (Exception e) { expectedException = true; } assertFalse("The process method should not throw any exception.", expectedException); List<String> znodes = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.assignmentZNode); String regionName = znodes.get(0); assertEquals("The region should not be opened successfully.", regionName, region.getRegionInfo().getEncodedName()); } finally { HRegion.closeHRegion(region); TEST_UTIL.shutdownMiniZKCluster(); } }