public ZKSplitLogManagerCoordination(final CoordinatedStateManager manager, ZooKeeperWatcher watcher) { super(watcher); taskFinisher = new TaskFinisher() { @Override public Status finish(ServerName workerName, String logfile) { try { WALSplitter.finishSplitLogFile(logfile, manager.getServer().getConfiguration()); } catch (IOException e) { LOG.warn("Could not finish splitting of log file " + logfile, e); return Status.ERR; } return Status.DONE; } }; this.server = manager.getServer(); this.conf = server.getConfiguration(); }
/** * Utility for constructing an instance of the passed HMaster class. * @param masterClass * @param conf * @return HMaster instance. */ public static HMaster constructMaster(Class<? extends HMaster> masterClass, final Configuration conf, final CoordinatedStateManager cp) { try { Constructor<? extends HMaster> c = masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class); return c.newInstance(conf, cp); } catch(Exception e) { Throwable error = e; if (e instanceof InvocationTargetException && ((InvocationTargetException)e).getTargetException() != null) { error = ((InvocationTargetException)e).getTargetException(); } throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". " , error); } }
/** * Creates a {@link RegionServerThread}. * Call 'start' on the returned thread to make it run. * @param c Configuration to use. * @param cp consensus provider to use * @param hrsc Class to create. * @param index Used distinguishing the object returned. * @throws IOException * @return Region server added. */ public static JVMClusterUtil.RegionServerThread createRegionServerThread( final Configuration c, CoordinatedStateManager cp, final Class<? extends HRegionServer> hrsc, final int index) throws IOException { HRegionServer server; try { Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class, CoordinatedStateManager.class); ctor.setAccessible(true); server = ctor.newInstance(c, cp); } catch (InvocationTargetException ite) { Throwable target = ite.getTargetException(); throw new RuntimeException("Failed construction of RegionServer: " + hrsc.toString() + ((target.getCause() != null)? target.getCause().getMessage(): ""), target); } catch (Exception e) { IOException ioe = new IOException(); ioe.initCause(e); throw ioe; } return new JVMClusterUtil.RegionServerThread(server, index); }
/** * Creates a {@link MasterThread}. * Call 'start' on the returned thread to make it run. * @param c Configuration to use. * @param cp consensus provider to use * @param hmc Class to create. * @param index Used distinguishing the object returned. * @throws IOException * @return Master added. */ public static JVMClusterUtil.MasterThread createMasterThread( final Configuration c, CoordinatedStateManager cp, final Class<? extends HMaster> hmc, final int index) throws IOException { HMaster server; try { server = hmc.getConstructor(Configuration.class, CoordinatedStateManager.class). newInstance(c, cp); } catch (InvocationTargetException ite) { Throwable target = ite.getTargetException(); throw new RuntimeException("Failed construction of Master: " + hmc.toString() + ((target.getCause() != null)? target.getCause().getMessage(): ""), target); } catch (Exception e) { IOException ioe = new IOException(); ioe.initCause(e); throw ioe; } return new JVMClusterUtil.MasterThread(server, index); }
private int start() throws Exception { Configuration conf = getConf(); HRegionServer.loadWinterConf(conf, null); CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); try { // If 'local', don't start a region server here. Defer to // LocalHBaseCluster. It manages 'local' clusters. if (LocalHBaseCluster.isLocal(conf)) { LOG.warn("Not starting a distinct region server because " + HConstants.CLUSTER_DISTRIBUTED + " is false"); } else { logProcessInfo(getConf()); HRegionServer hrs = HRegionServer.constructRegionServer(regionServerClass, conf, cp); hrs.start(); hrs.join(); if (hrs.isAborted()) { throw new RuntimeException("HRegionServer Aborted"); } } } catch (Throwable t) { LOG.error("Region server exiting", t); return 1; } return 0; }
@Test public void testClusterId() throws Exception { TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniDFSCluster(1); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); //start region server, needs to be separate //so we get an unset clusterId rst = JVMClusterUtil.createRegionServerThread(conf,cp, HRegionServer.class, 0); rst.start(); //Make sure RS is in blocking state Thread.sleep(10000); TEST_UTIL.startMiniHBaseCluster(1, 1); rst.waitForServerOnline(); String clusterId = ZKClusterId.readClusterIdZNode(TEST_UTIL.getZooKeeperWatcher()); assertNotNull(clusterId); assertEquals(clusterId, rst.getRegionServer().getClusterId()); }
/** * Utility for constructing an instance of the passed HMaster class. * * @param masterClass * @param conf * @return HMaster instance. */ public static HMaster constructMaster(Class<? extends HMaster> masterClass, final Configuration conf, final CoordinatedStateManager cp) { try { Constructor<? extends HMaster> c = masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class); return c.newInstance(conf, cp); } catch (InvocationTargetException ite) { Throwable target = ite.getTargetException() != null ? ite.getTargetException() : ite; if (target.getCause() != null) target = target.getCause(); throw new RuntimeException("Failed construction of Master: " + masterClass.toString(), target); } catch (Exception e) { throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ((e.getCause() != null) ? e.getCause().getMessage() : ""), e); } }
private int start() throws Exception { Configuration conf = getConf(); CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); try { // If 'local', don't start a region server here. Defer to // LocalHBaseCluster. It manages 'local' clusters. if (LocalHBaseCluster.isLocal(conf)) { LOG.warn("Not starting a distinct region server because " + HConstants.CLUSTER_DISTRIBUTED + " is false"); } else { logProcessInfo(getConf()); HRegionServer hrs = HRegionServer.constructRegionServer(regionServerClass, conf, cp); hrs.start(); hrs.join(); if (hrs.isAborted()) { throw new RuntimeException("HRegionServer Aborted"); } } } catch (Throwable t) { LOG.error("Region server exiting", t); return 1; } return 0; }
@Override public void initialize(MasterServices master, MetricsMaster metricsMaster) throws IOException, UnsupportedOperationException { this.master = master; this.done = false; // setup the default procedure coordinator String name = master.getServerName().toString(); // get the configuration for the coordinator Configuration conf = master.getConfiguration(); long wakeFrequency = conf.getInt(BACKUP_WAKE_MILLIS_KEY, BACKUP_WAKE_MILLIS_DEFAULT); long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY,BACKUP_TIMEOUT_MILLIS_DEFAULT); int opThreads = conf.getInt(BACKUP_POOL_THREAD_NUMBER_KEY, BACKUP_POOL_THREAD_NUMBER_DEFAULT); // setup the default procedure coordinator ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads); CoordinatedStateManager coordManager = new ZkCoordinatedStateManager(master); ProcedureCoordinatorRpcs comms = coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name); this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); }
@Override public void initialize(RegionServerServices rss) throws KeeperException { this.rss = rss; if (!BackupManager.isBackupEnabled(rss.getConfiguration())) { LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY + " setting"); return; } CoordinatedStateManager coordManager = new ZkCoordinatedStateManager(rss); this.memberRpcs = coordManager .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE); // read in the backup handler configuration properties Configuration conf = rss.getConfiguration(); long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT); int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT); // create the actual cohort member ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive); this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder()); }
/** * Utility for constructing an instance of the passed HMaster class. * @param masterClass * @param conf * @return HMaster instance. */ public static HMaster constructMaster(Class<? extends HMaster> masterClass, final Configuration conf, final CoordinatedStateManager cp) { try { Constructor<? extends HMaster> c = masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class); return c.newInstance(conf, cp); } catch (InvocationTargetException ite) { Throwable target = ite.getTargetException() != null? ite.getTargetException(): ite; if (target.getCause() != null) target = target.getCause(); throw new RuntimeException("Failed construction of Master: " + masterClass.toString(), target); } catch (Exception e) { throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ((e.getCause() != null)? e.getCause().getMessage(): ""), e); } }
/** * Test verifies whether stale znodes of unknown tables as for the hbase:meta will be removed or * not. * @throws KeeperException * @throws IOException * @throws Exception */ @Test public void testMasterRestartShouldRemoveStaleZnodesOfUnknownTableAsForMeta() throws Exception { List<ServerName> destServers = new ArrayList<ServerName>(1); destServers.add(SERVERNAME_A); Mockito.when(this.serverManager.createDestinationServersList()).thenReturn(destServers); Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(true); HTU.getConfiguration().setInt(HConstants.MASTER_PORT, 0); CoordinatedStateManager csm = CoordinatedStateManagerFactory.getCoordinatedStateManager( HTU.getConfiguration()); Server server = new HMaster(HTU.getConfiguration(), csm); Whitebox.setInternalState(server, "serverManager", this.serverManager); AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(server, this.serverManager); try { TableName tableName = TableName.valueOf("dummyTable"); // set table in enabling state. am.getTableStateManager().setTableState(tableName, Table.State.ENABLING); am.joinCluster(); assertFalse("Table should not be present in zookeeper.", am.getTableStateManager().isTablePresent(tableName)); } finally { } }
@Test public void testClusterId() throws Exception { TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniDFSCluster(1); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); //start region server, needs to be separate //so we get an unset clusterId rst = JVMClusterUtil.createRegionServerThread(conf,cp, HRegionServer.class, 0); rst.start(); //Make sure RS is in blocking state Thread.sleep(10000); TEST_UTIL.startMiniHBaseCluster(1, 0); rst.waitForServerOnline(); String clusterId = ZKClusterId.readClusterIdZNode(TEST_UTIL.getZooKeeperWatcher()); assertNotNull(clusterId); assertEquals(clusterId, rst.getRegionServer().getClusterId()); }
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir, FileSystem fs, LastSequenceId idChecker, CoordinatedStateManager csm, RecoveryMode mode) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.rootDir = rootDir; this.fs = fs; this.sequenceIdChecker = idChecker; this.csm = (BaseCoordinatedStateManager)csm; this.walFactory = factory; this.controller = new PipelineController(); entryBuffers = new EntryBuffers(controller, this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128*1024*1024)); // a larger minBatchSize may slow down recovery because replay writer has to wait for // enough edits before replaying them this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64); this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode); this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); if (csm != null && this.distributedLogReplay) { outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads); } else { if (this.distributedLogReplay) { LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly."); } this.distributedLogReplay = false; outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); } }
/** * Utility for constructing an instance of the passed HRegionServer class. * * @param regionServerClass * @param conf2 * @return HRegionServer instance. */ public static HRegionServer constructRegionServer( Class<? extends HRegionServer> regionServerClass, final Configuration conf2, CoordinatedStateManager cp) { try { Constructor<? extends HRegionServer> c = regionServerClass.getConstructor(Configuration.class, CoordinatedStateManager.class); return c.newInstance(conf2, cp); } catch (Exception e) { throw new RuntimeException( "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e); } }
@Override public CoordinatedStateManager getCoordinatedStateManager() { BaseCoordinatedStateManager m = Mockito.mock(BaseCoordinatedStateManager.class); SplitLogManagerCoordination c = Mockito.mock(SplitLogManagerCoordination.class); Mockito.when(m.getSplitLogManagerCoordination()).thenReturn(c); SplitLogManagerDetails d = Mockito.mock(SplitLogManagerDetails.class); Mockito.when(c.getDetails()).thenReturn(d); return m; }
/** * Test starting master then stopping it before its fully up. * @throws IOException * @throws KeeperException * @throws InterruptedException */ @Test (timeout=30000) public void testStopDuringStart() throws IOException, KeeperException, InterruptedException { CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager( TESTUTIL.getConfiguration()); HMaster master = new HMaster(TESTUTIL.getConfiguration(), cp); master.start(); // Immediately have it stop. We used hang in assigning meta. master.stopMaster(); master.join(); }
@Before public void setup() { Configuration conf = HBaseConfiguration.create(); conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); master = HMaster.constructMaster(HMaster.class, conf, cp); priority = master.getMasterRpcServices().getPriority(); user = User.createUserForTesting(conf, "someuser", new String[]{"somegroup"}); }
@Before public void setUp() throws Exception { Configuration conf = testUtil.getConfiguration(); conf.set(HConstants.MASTER_PORT, "0"); conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 2000); testUtil.startMiniZKCluster(); CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); ZooKeeperWatcher watcher = testUtil.getZooKeeperWatcher(); ZKUtil.createWithParents(watcher, watcher.getMasterAddressZNode(), Bytes.toBytes("fake:123")); master = new HMaster(conf, cp); rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT); }
@Test public void testFailAfterPONR() throws IOException, KeeperException, InterruptedException { final int rowCountOfRegionA = loadRegion(this.region_a, CF, true); final int rowCountOfRegionB = loadRegion(this.region_b, CF, true); assertTrue(rowCountOfRegionA > 0 && rowCountOfRegionB > 0); assertEquals(rowCountOfRegionA, countRows(this.region_a)); assertEquals(rowCountOfRegionB, countRows(this.region_b)); // Start transaction. RegionMergeTransactionImpl mt = prepareOnGoodRegions(); Mockito.doThrow(new MockedFailedMergedRegionOpen()) .when(mt) .openMergedRegion((Server) Mockito.anyObject(), (RegionServerServices) Mockito.anyObject(), (HRegion) Mockito.anyObject()); // Run the execute. Look at what it returns. boolean expectedException = false; TEST_UTIL.getConfiguration().setInt(HConstants.REGIONSERVER_PORT, 0); CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager( TEST_UTIL.getConfiguration()); Server mockServer = new HRegionServer(TEST_UTIL.getConfiguration(), cp); try { mt.execute(mockServer, null); } catch (MockedFailedMergedRegionOpen e) { expectedException = true; } assertTrue(expectedException); // Run rollback returns false that we should restart. assertFalse(mt.rollback(null, null)); // Make sure that merged region is still in the filesystem, that // they have not been removed; this is supposed to be the case if we go // past point of no return. Path tableDir = this.region_a.getRegionFileSystem().getRegionDir() .getParent(); Path mergedRegionDir = new Path(tableDir, mt.getMergedRegionInfo() .getEncodedName()); assertTrue(TEST_UTIL.getTestFileSystem().exists(mergedRegionDir)); }
@Before public void setup() { Configuration conf = HBaseConfiguration.create(); conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); regionServer = HRegionServer.constructRegionServer(HRegionServer.class, conf, cp); priority = regionServer.rpcServices.getPriority(); }
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir, FileSystem fs, LastSequenceId idChecker, CoordinatedStateManager csm, RecoveryMode mode) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.rootDir = rootDir; this.fs = fs; this.sequenceIdChecker = idChecker; this.csm = (BaseCoordinatedStateManager)csm; this.walFactory = factory; entryBuffers = new EntryBuffers( this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128*1024*1024)); // a larger minBatchSize may slow down recovery because replay writer has to wait for // enough edits before replaying them this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64); this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode); this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); if (csm != null && this.distributedLogReplay) { outputSink = new LogReplayOutputSink(numWriterThreads); } else { if (this.distributedLogReplay) { LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly."); } this.distributedLogReplay = false; outputSink = new LogRecoveredEditsOutputSink(numWriterThreads); } }
/** * Utility for constructing an instance of the passed HRegionServer class. * * @param regionServerClass * @param conf2 * @return HRegionServer instance. */ public static HRegionServer constructRegionServer( Class<? extends HRegionServer> regionServerClass, final Configuration conf2, CoordinatedStateManager cp) { try { Constructor<? extends HRegionServer> c = regionServerClass .getConstructor(Configuration.class, CoordinatedStateManager.class); return c.newInstance(conf2, cp); } catch (Exception e) { throw new RuntimeException("Failed construction of " + "Regionserver: " + regionServerClass.toString(), e); } }
/** * Test verifies whether stale znodes of unknown tables as for the hbase:meta will be removed or * not. * @throws KeeperException * @throws IOException * @throws Exception */ @Test (timeout=180000) public void testMasterRestartShouldRemoveStaleZnodesOfUnknownTableAsForMeta() throws Exception { List<ServerName> destServers = new ArrayList<ServerName>(1); destServers.add(SERVERNAME_A); Mockito.when(this.serverManager.createDestinationServersList()).thenReturn(destServers); Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(true); HTU.getConfiguration().setInt(HConstants.MASTER_PORT, 0); CoordinatedStateManager csm = CoordinatedStateManagerFactory.getCoordinatedStateManager( HTU.getConfiguration()); Server server = new HMaster(HTU.getConfiguration(), csm); Whitebox.setInternalState(server, "serverManager", this.serverManager); AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(server, this.serverManager); Whitebox.setInternalState(server, "metaTableLocator", Mockito.mock(MetaTableLocator.class)); // Make it so we can get a catalogtracker from servermanager.. .needed // down in guts of server shutdown handler. Whitebox.setInternalState(server, "clusterConnection", am.getConnection()); try { TableName tableName = TableName.valueOf("dummyTable"); // set table in enabling state. am.getTableStateManager().setTableState(tableName, Table.State.ENABLING); am.joinCluster(); assertFalse("Table should not be present in zookeeper.", am.getTableStateManager().isTablePresent(tableName)); } finally { am.shutdown(); } }
@Test public void testFailAfterPONR() throws IOException, KeeperException, InterruptedException { final int rowCountOfRegionA = loadRegion(this.region_a, CF, true); final int rowCountOfRegionB = loadRegion(this.region_b, CF, true); assertTrue(rowCountOfRegionA > 0 && rowCountOfRegionB > 0); assertEquals(rowCountOfRegionA, countRows(this.region_a)); assertEquals(rowCountOfRegionB, countRows(this.region_b)); // Start transaction. RegionMergeTransaction mt = prepareOnGoodRegions(); Mockito.doThrow(new MockedFailedMergedRegionOpen()) .when(mt) .openMergedRegion((Server) Mockito.anyObject(), (RegionServerServices) Mockito.anyObject(), (HRegion) Mockito.anyObject()); // Run the execute. Look at what it returns. boolean expectedException = false; TEST_UTIL.getConfiguration().setInt(HConstants.REGIONSERVER_PORT, 0); CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager( TEST_UTIL.getConfiguration()); Server mockServer = new HRegionServer(TEST_UTIL.getConfiguration(), cp); try { mt.execute(mockServer, null); } catch (MockedFailedMergedRegionOpen e) { expectedException = true; } assertTrue(expectedException); // Run rollback returns false that we should restart. assertFalse(mt.rollback(null, null)); // Make sure that merged region is still in the filesystem, that // they have not been removed; this is supposed to be the case if we go // past point of no return. Path tableDir = this.region_a.getRegionFileSystem().getRegionDir() .getParent(); Path mergedRegionDir = new Path(tableDir, mt.getMergedRegionInfo() .getEncodedName()); assertTrue(TEST_UTIL.getTestFileSystem().exists(mergedRegionDir)); }
HLogSplitter(Configuration conf, Path rootDir, FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw, CoordinatedStateManager csm) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.rootDir = rootDir; this.fs = fs; this.sequenceIdChecker = idChecker; this.watcher = zkw; this.csm = csm; entryBuffers = new EntryBuffers( this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128*1024*1024)); // a larger minBatchSize may slow down recovery because replay writer has to wait for // enough edits before replaying them this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64); this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf); this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); if (zkw != null && csm != null && this.distributedLogReplay) { outputSink = new LogReplayOutputSink(numWriterThreads); } else { if (this.distributedLogReplay) { LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly."); } this.distributedLogReplay = false; outputSink = new LogRecoveredEditsOutputSink(numWriterThreads); } }
/** * Test verifies whether all the enabling table regions assigned only once during master startup. * * @throws KeeperException * @throws IOException * @throws Exception */ @Test public void testMasterRestartWhenTableInEnabling() throws KeeperException, IOException, Exception { enabling = true; List<ServerName> destServers = new ArrayList<ServerName>(1); destServers.add(SERVERNAME_A); Mockito.when(this.serverManager.createDestinationServersList()).thenReturn(destServers); Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(true); HTU.getConfiguration().setInt(HConstants.MASTER_PORT, 0); CoordinatedStateManager csm = CoordinatedStateManagerFactory.getCoordinatedStateManager( HTU.getConfiguration()); Server server = new HMaster(HTU.getConfiguration(), csm); Whitebox.setInternalState(server, "serverManager", this.serverManager); AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(server, this.serverManager); try { // set table in enabling state. am.getTableStateManager().setTableState(REGIONINFO.getTable(), Table.State.ENABLING); new EnableTableHandler(server, REGIONINFO.getTable(), am.getCatalogTracker(), am, new NullTableLockManager(), true).prepare() .process(); assertEquals("Number of assignments should be 1.", 1, assignmentCount); assertTrue("Table should be enabled.", am.getTableStateManager().isTableState(REGIONINFO.getTable(), Table.State.ENABLED)); } finally { enabling = false; assignmentCount = 0; am.getTableStateManager().setTableState(REGIONINFO.getTable(), Table.State.ENABLED); am.shutdown(); ZKAssign.deleteAllNodes(this.watcher); } }