/** * Checks and sets table state in ZK. Sets no watches. * {@inheritDoc} */ @Override public boolean setTableStateIfInStates(TableName tableName, ZooKeeperProtos.Table.State newState, ZooKeeperProtos.Table.State... states) throws CoordinatedStateException { synchronized (this.cache) { // Transition ENABLED->DISABLING has to be performed with a hack, because // we treat empty state as enabled in this case because 0.92- clusters. if ( (newState == ZooKeeperProtos.Table.State.DISABLING) && this.cache.get(tableName) != null && !isTableState(tableName, states) || (newState != ZooKeeperProtos.Table.State.DISABLING && !isTableState(tableName, states) )) { return false; } try { setTableStateInZK(tableName, newState); } catch (KeeperException e) { throw new CoordinatedStateException(e); } return true; } }
/** * Deletes the table in zookeeper. Fails silently if the table is not currently disabled in * zookeeper. Sets no watches. {@inheritDoc} */ @Override public void setDeletedTable(final TableName tableName) throws CoordinatedStateException { synchronized (this.cache) { if (this.cache.remove(tableName) == null) { LOG.warn("Moving table " + tableName + " state to deleted but was already deleted"); } try { ZKUtil.deleteNodeFailSilent(this.watcher, ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString())); } catch (KeeperException e) { throw new CoordinatedStateException(e); } } }
/** * {@inheritDoc} */ @Override public void checkAndRemoveTableState(TableName tableName, ZooKeeperProtos.Table.State states, boolean deletePermanentState) throws CoordinatedStateException { synchronized (this.cache) { if (isTableState(tableName, states)) { this.cache.remove(tableName); if (deletePermanentState) { try { ZKUtil.deleteNodeFailSilent(this.watcher, ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString())); } catch (KeeperException e) { throw new CoordinatedStateException(e); } } } } }
protected static void waitRegionInTransition(final MasterProcedureEnv env, final List<HRegionInfo> regions) throws IOException, CoordinatedStateException { final AssignmentManager am = env.getMasterServices().getAssignmentManager(); final RegionStates states = am.getRegionStates(); for (final HRegionInfo region : regions) { ProcedureSyncWait.waitFor(env, "regions " + region.getRegionNameAsString() + " in transition", new ProcedureSyncWait.Predicate<Boolean>() { @Override public Boolean evaluate() throws IOException { if (states.isRegionInState(region, State.FAILED_OPEN)) { am.regionOffline(region); } return !states.isRegionInTransition(region); } }); } }
/** * Recover the tables that were not fully moved to DISABLED state. These * tables are in DISABLING state when the master restarted/switched. * * @throws KeeperException * @throws TableNotFoundException * @throws IOException */ private void recoverTableInDisablingState() throws KeeperException, IOException, CoordinatedStateException { Set<TableName> disablingTables = tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING); if (disablingTables.size() != 0) { for (TableName tableName : disablingTables) { // Recover by calling DisableTableHandler LOG.info("The table " + tableName + " is in DISABLING state. Hence recovering by moving the table" + " to DISABLED state."); new DisableTableHandler(this.server, tableName, this, tableLockManager, true).prepare().process(); } } }
/** * Recover the tables that are not fully moved to ENABLED state. These tables * are in ENABLING state when the master restarted/switched * * @throws KeeperException * @throws org.apache.hadoop.hbase.TableNotFoundException * @throws IOException */ private void recoverTableInEnablingState() throws KeeperException, IOException, CoordinatedStateException { Set<TableName> enablingTables = tableStateManager. getTablesInStates(ZooKeeperProtos.Table.State.ENABLING); if (enablingTables.size() != 0) { for (TableName tableName : enablingTables) { // Recover by calling EnableTableHandler LOG.info("The table " + tableName + " is in ENABLING state. Hence recovering by moving the table" + " to ENABLED state."); // enableTable in sync way during master startup, // no need to invoke coprocessor EnableTableHandler eth = new EnableTableHandler(this.server, tableName, this, tableLockManager, true); try { eth.prepare(); } catch (TableNotFoundException e) { LOG.warn("Table " + tableName + " not found in hbase:meta to recover."); continue; } eth.process(); } } }
/** * Deletes the table in zookeeper. Fails silently if the * table is not currently disabled in zookeeper. Sets no watches. * * {@inheritDoc} */ @Override public void setDeletedTable(final TableName tableName) throws CoordinatedStateException { synchronized (this.cache) { if (this.cache.remove(tableName) == null) { LOG.warn("Moving table " + tableName + " state to deleted but was " + "already deleted"); } try { ZKUtil.deleteNodeFailSilent(this.watcher, ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString())); } catch (KeeperException e) { throw new CoordinatedStateException(e); } } }
@Override protected void handleTableOperation(List<HRegionInfo> regions) throws IOException, CoordinatedStateException { MasterCoprocessorHost cpHost = ((HMaster) this.server).getMasterCoprocessorHost(); if (cpHost != null) { cpHost.preTruncateTableHandler(this.tableName); } // 1. Wait because of region in transition waitRegionInTransition(regions); // 2. Remove table from hbase:meta and HDFS removeTableData(regions); // ----------------------------------------------------------------------- // PONR: At this point the table is deleted. // If the recreate fails, the user can only re-create the table. // ----------------------------------------------------------------------- // 3. Recreate the regions recreateTable(regions); if (cpHost != null) { cpHost.postTruncateTableHandler(this.tableName); } }
static void checkAndSetEnablingTable(final AssignmentManager assignmentManager, final TableName tableName) throws IOException { // If we have multiple client threads trying to create the table at the // same time, given the async nature of the operation, the table // could be in a state where hbase:meta table hasn't been updated yet in // the process() function. // Use enabling state to tell if there is already a request for the same // table in progress. This will introduce a new zookeeper call. Given // createTable isn't a frequent operation, that should be ok. // TODO: now that we have table locks, re-evaluate above -- table locks are not enough. // We could have cleared the hbase.rootdir and not zk. How can we detect this case? // Having to clean zk AND hdfs is awkward. try { if (!assignmentManager.getTableStateManager().setTableStateIfNotInStates(tableName, ZooKeeperProtos.Table.State.ENABLING, ZooKeeperProtos.Table.State.ENABLING, ZooKeeperProtos.Table.State.ENABLED)) { throw new TableExistsException(tableName); } } catch (CoordinatedStateException e) { throw new IOException("Unable to ensure that the table will be" + " enabling because of a ZooKeeper issue", e); } }
/** * Run a simple server shutdown handler. * @throws KeeperException * @throws IOException */ @Test (timeout=180000) public void testShutdownHandler() throws KeeperException, IOException, CoordinatedStateException, ServiceException { // Create and startup an executor. This is used by AssignmentManager // handling zk callbacks. ExecutorService executor = startupMasterExecutor("testShutdownHandler"); // Create an AM. AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager( this.server, this.serverManager); try { processServerShutdownHandler(am, false); } finally { executor.shutdown(); am.shutdown(); // Clean up all znodes ZKAssign.deleteAllNodes(this.watcher); } }
@Override protected void handleTableOperation(List<HRegionInfo> regions) throws IOException, CoordinatedStateException { MasterCoprocessorHost cpHost = ((HMaster) this.server).getMasterCoprocessorHost(); if (cpHost != null) { cpHost.preTruncateTableHandler(this.tableName); } // 1. Wait because of region in transition waitRegionInTransition(regions); // 2. Remove table from .META. and HDFS removeTableData(regions); // ----------------------------------------------------------------------- // PONR: At this point the table is deleted. // If the recreate fails, the user can only re-create the table. // ----------------------------------------------------------------------- // 3. Recreate the regions recreateTable(regions); if (cpHost != null) { cpHost.postTruncateTableHandler(this.tableName); } }
/** * Called on startup. * Figures whether a fresh cluster start of we are joining extant running cluster. * @throws IOException * @throws KeeperException * @throws InterruptedException * @throws CoordinatedStateException */ void joinCluster() throws IOException, KeeperException, InterruptedException, CoordinatedStateException { // Concurrency note: In the below the accesses on regionsInTransition are // outside of a synchronization block where usually all accesses to RIT are // synchronized. The presumption is that in this case it is safe since this // method is being played by a single thread on startup. // TODO: Regions that have a null location and are not in regionsInTransitions // need to be handled. // Scan hbase:meta to build list of existing regions, servers, and assignment // Returns servers who have not checked in (assumed dead) and their regions Map<ServerName, List<HRegionInfo>> deadServers; deadServers = rebuildUserRegions(); // This method will assign all user regions if a clean server startup or // it will reconstruct master state and cleanup any leftovers from // previous master process. processDeadServersAndRegionsInTransition(deadServers); recoverTableInDisablingState(); recoverTableInEnablingState(); }
/** * Recover the tables that were not fully moved to DISABLED state. These * tables are in DISABLING state when the master restarted/switched. * * @throws KeeperException * @throws TableNotFoundException * @throws IOException */ private void recoverTableInDisablingState() throws KeeperException, IOException, CoordinatedStateException { Set<TableName> disablingTables = tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING); if (disablingTables.size() != 0) { for (TableName tableName : disablingTables) { // Recover by calling DisableTableHandler LOG.info("The table " + tableName + " is in DISABLING state. Hence recovering by moving the table" + " to DISABLED state."); new DisableTableHandler(this.server, tableName, catalogTracker, this, tableLockManager, true).prepare().process(); } } }
/** * Recover the tables that are not fully moved to ENABLED state. These tables * are in ENABLING state when the master restarted/switched * * @throws KeeperException * @throws org.apache.hadoop.hbase.TableNotFoundException * @throws IOException */ private void recoverTableInEnablingState() throws KeeperException, IOException, CoordinatedStateException { Set<TableName> enablingTables = tableStateManager. getTablesInStates(ZooKeeperProtos.Table.State.ENABLING); if (enablingTables.size() != 0) { for (TableName tableName : enablingTables) { // Recover by calling EnableTableHandler LOG.info("The table " + tableName + " is in ENABLING state. Hence recovering by moving the table" + " to ENABLED state."); // enableTable in sync way during master startup, // no need to invoke coprocessor EnableTableHandler eth = new EnableTableHandler(this.server, tableName, catalogTracker, this, tableLockManager, true); try { eth.prepare(); } catch (TableNotFoundException e) { LOG.warn("Table " + tableName + " not found in hbase:meta to recover."); continue; } eth.process(); } } }
/** * Run a simple server shutdown handler. * @throws KeeperException * @throws IOException */ @Test public void testShutdownHandler() throws KeeperException, IOException, CoordinatedStateException, ServiceException { // Create and startup an executor. This is used by AssignmentManager // handling zk callbacks. ExecutorService executor = startupMasterExecutor("testShutdownHandler"); // We need a mocked catalog tracker. CatalogTracker ct = Mockito.mock(CatalogTracker.class); // Create an AM. AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager( this.server, this.serverManager); try { processServerShutdownHandler(ct, am, false); } finally { executor.shutdown(); am.shutdown(); // Clean up all znodes ZKAssign.deleteAllNodes(this.watcher); } }
/** * Checks and sets table state in ZK. Sets no watches. * {@inheritDoc} */ @Override public boolean setTableStateIfNotInStates(TableName tableName, ZooKeeperProtos.Table.State newState, ZooKeeperProtos.Table.State... states) throws CoordinatedStateException { synchronized (this.cache) { if (isTableState(tableName, states)) { return false; } try { setTableStateInZK(tableName, newState); } catch (KeeperException e) { throw new CoordinatedStateException(e); } return true; } }
@Override public TableStateManager getTableStateManager() throws InterruptedException, CoordinatedStateException { try { return new ZKTableStateManager(server.getZooKeeper()); } catch (KeeperException e) { throw new CoordinatedStateException(e); } }
/** * Sets table state in ZK. Sets no watches. * * {@inheritDoc} */ @Override public void setTableState(TableName tableName, ZooKeeperProtos.Table.State state) throws CoordinatedStateException { synchronized (this.cache) { LOG.info("Moving table " + tableName + " state from " + this.cache.get(tableName) + " to " + state); try { setTableStateInZK(tableName, state); } catch (KeeperException e) { throw new CoordinatedStateException(e); } } }
/** * Gets a list of all the tables set as disabling in zookeeper. * @return Set of disabling tables, empty Set if none * @throws CoordinatedStateException if error happened in underlying coordination engine */ @Override public Set<TableName> getTablesInStates(ZooKeeperProtos.Table.State... states) throws InterruptedIOException, CoordinatedStateException { try { return getAllTables(states); } catch (KeeperException e) { throw new CoordinatedStateException(e); } }
@Override protected void chore() { try { master.normalizeRegions(); } catch (IOException | CoordinatedStateException e) { LOG.error("Failed to normalize regions.", e); } }
static void checkAndSetEnablingTable(final AssignmentManager assignmentManager, final TableName tableName, boolean skipTableStateCheck) throws IOException { // If we have multiple client threads trying to create the table at the // same time, given the async nature of the operation, the table // could be in a state where hbase:meta table hasn't been updated yet in // the process() function. // Use enabling state to tell if there is already a request for the same // table in progress. This will introduce a new zookeeper call. Given // createTable isn't a frequent operation, that should be ok. // TODO: now that we have table locks, re-evaluate above -- table locks are not enough. // We could have cleared the hbase.rootdir and not zk. How can we detect this case? // Having to clean zk AND hdfs is awkward. try { if (skipTableStateCheck) { assignmentManager.getTableStateManager().setTableState( tableName, ZooKeeperProtos.Table.State.ENABLING); } else if (!assignmentManager.getTableStateManager().setTableStateIfNotInStates( tableName, ZooKeeperProtos.Table.State.ENABLING, ZooKeeperProtos.Table.State.ENABLING, ZooKeeperProtos.Table.State.ENABLED)) { throw new TableExistsException(tableName); } } catch (CoordinatedStateException e) { throw new IOException("Unable to ensure that the table will be" + " enabling because of a ZooKeeper issue", e); } }
static void removeEnablingTable(final AssignmentManager assignmentManager, final TableName tableName) { // Try deleting the enabling node in case of error // If this does not happen then if the client tries to create the table // again with the same Active master // It will block the creation saying TableAlreadyExists. try { assignmentManager.getTableStateManager().checkAndRemoveTableState(tableName, ZooKeeperProtos.Table.State.ENABLING, false); } catch (CoordinatedStateException e) { // Keeper exception should not happen here LOG.error("Got a keeper exception while removing the ENABLING table znode " + tableName, e); } }
private void handleDisableTable() throws IOException, CoordinatedStateException { // Set table disabling flag up in zk. this.assignmentManager.getTableStateManager().setTableState(this.tableName, ZooKeeperProtos.Table.State.DISABLING); boolean done = false; while (true) { // Get list of online regions that are of this table. Regions that are // already closed will not be included in this list; i.e. the returned // list is not ALL regions in a table, its all online regions according // to the in-memory state on this master. final List<HRegionInfo> regions = this.assignmentManager .getRegionStates().getRegionsOfTable(tableName); if (regions.size() == 0) { done = true; break; } LOG.info("Offlining " + regions.size() + " regions."); BulkDisabler bd = new BulkDisabler(this.server, regions); try { if (bd.bulkAssign()) { done = true; break; } } catch (InterruptedException e) { LOG.warn("Disable was interrupted"); // Preserve the interrupt. Thread.currentThread().interrupt(); break; } } // Flip the table to disabled if success. if (done) this.assignmentManager.getTableStateManager().setTableState(this.tableName, ZooKeeperProtos.Table.State.DISABLED); LOG.info("Disabled table, " + this.tableName + ", is done=" + done); }
@Override public NormalizeResponse normalize(RpcController controller, NormalizeRequest request) throws ServiceException { try { return NormalizeResponse.newBuilder().setNormalizerRan(master.normalizeRegions()).build(); } catch (IOException | CoordinatedStateException ex) { throw new ServiceException(ex); } }
/** * Called on startup. * Figures whether a fresh cluster start of we are joining extant running cluster. * @throws IOException * @throws KeeperException * @throws InterruptedException * @throws CoordinatedStateException */ void joinCluster() throws IOException, KeeperException, InterruptedException, CoordinatedStateException { long startTime = System.currentTimeMillis(); // Concurrency note: In the below the accesses on regionsInTransition are // outside of a synchronization block where usually all accesses to RIT are // synchronized. The presumption is that in this case it is safe since this // method is being played by a single thread on startup. // TODO: Regions that have a null location and are not in regionsInTransitions // need to be handled. // Scan hbase:meta to build list of existing regions, servers, and assignment // Returns servers who have not checked in (assumed dead) that some regions // were assigned to (according to the meta) Set<ServerName> deadServers = rebuildUserRegions(); // This method will assign all user regions if a clean server startup or // it will reconstruct master state and cleanup any leftovers from previous master process. boolean failover = processDeadServersAndRegionsInTransition(deadServers); if (!useZKForAssignment) { // Not use ZK for assignment any more, remove the ZNode ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode); } recoverTableInDisablingState(); recoverTableInEnablingState(); LOG.info("Joined the cluster in " + (System.currentTimeMillis() - startTime) + "ms, failover=" + failover); }
protected void setEnabledTable(TableName tableName) { try { this.tableStateManager.setTableState(tableName, ZooKeeperProtos.Table.State.ENABLED); } catch (CoordinatedStateException e) { // here we can abort as it is the start up flow String errorMsg = "Unable to ensure that the table " + tableName + " will be" + " enabled because of a ZooKeeper issue"; LOG.error(errorMsg); this.server.abort(errorMsg, e); } }
/** * Sets table state in ZK. Sets no watches. * * {@inheritDoc} */ @Override public void setTableState(TableName tableName, ZooKeeperProtos.Table.State state) throws CoordinatedStateException { synchronized (this.cache) { LOG.warn("Moving table " + tableName + " state from " + this.cache.get(tableName) + " to " + state); try { setTableStateInZK(tableName, state); } catch (KeeperException e) { throw new CoordinatedStateException(e); } } }
protected void waitRegionInTransition(final List<HRegionInfo> regions) throws IOException, CoordinatedStateException { AssignmentManager am = this.masterServices.getAssignmentManager(); RegionStates states = am.getRegionStates(); long waitTime = server.getConfiguration(). getLong("hbase.master.wait.on.region", 5 * 60 * 1000); for (HRegionInfo region : regions) { long done = System.currentTimeMillis() + waitTime; while (System.currentTimeMillis() < done) { if (states.isRegionInState(region, State.FAILED_OPEN)) { am.regionOffline(region); } if (!states.isRegionInTransition(region)) break; try { Thread.sleep(waitingTimeForEvents); } catch (InterruptedException e) { LOG.warn("Interrupted while sleeping"); throw (InterruptedIOException)new InterruptedIOException().initCause(e); } LOG.debug("Waiting on region to clear regions in transition; " + am.getRegionStates().getRegionTransitionState(region)); } if (states.isRegionInTransition(region)) { throw new IOException("Waited hbase.master.wait.on.region (" + waitTime + "ms) for region to leave region " + region.getRegionNameAsString() + " in transitions"); } } }
@Override protected void handleTableOperation(List<HRegionInfo> regions) throws IOException, CoordinatedStateException { MasterCoprocessorHost cpHost = ((HMaster) this.server).getMasterCoprocessorHost(); if (cpHost != null) { cpHost.preDeleteTableHandler(this.tableName); } // 1. Wait because of region in transition waitRegionInTransition(regions); try { // 2. Remove table from hbase:meta and HDFS removeTableData(regions); } finally { // 3. Update table descriptor cache LOG.debug("Removing '" + tableName + "' descriptor."); this.masterServices.getTableDescriptors().remove(tableName); AssignmentManager am = this.masterServices.getAssignmentManager(); // 4. Clean up regions of the table in RegionStates. LOG.debug("Removing '" + tableName + "' from region states."); am.getRegionStates().tableDeleted(tableName); // 5. If entry for this table in zk, and up in AssignmentManager, remove it. LOG.debug("Marking '" + tableName + "' as deleted."); am.getTableStateManager().setDeletedTable(tableName); // 6.Clean any remaining rows for this table. cleanAnyRemainingRows(); } if (cpHost != null) { cpHost.postDeleteTableHandler(this.tableName); } }
/** * Removes the table from hbase:meta and archives the HDFS files. */ protected void removeTableData(final List<HRegionInfo> regions) throws IOException, CoordinatedStateException { // 1. Remove regions from META LOG.debug("Deleting regions from META"); MetaTableAccessor.deleteRegions(this.server.getConnection(), regions); // ----------------------------------------------------------------------- // NOTE: At this point we still have data on disk, but nothing in hbase:meta // if the rename below fails, hbck will report an inconsistency. // ----------------------------------------------------------------------- // 2. Move the table in /hbase/.tmp MasterFileSystem mfs = this.masterServices.getMasterFileSystem(); Path tempTableDir = mfs.moveTableToTemp(tableName); // 3. Archive regions from FS (temp directory) FileSystem fs = mfs.getFileSystem(); for (HRegionInfo hri : regions) { LOG.debug("Archiving region " + hri.getRegionNameAsString() + " from FS"); HFileArchiver.archiveRegion(fs, mfs.getRootDir(), tempTableDir, HRegion.getRegionDir(tempTableDir, hri.getEncodedName())); } // 4. Delete table directory from FS (temp directory) if (!fs.delete(tempTableDir, true)) { LOG.error("Couldn't delete " + tempTableDir); } LOG.debug("Table '" + tableName + "' archived!"); }