protected static List<HRegionInfo> addTableToMeta(final MasterProcedureEnv env, final HTableDescriptor hTableDescriptor, final List<HRegionInfo> regions) throws IOException { if (regions != null && regions.size() > 0) { ProcedureSyncWait.waitMetaRegions(env); // Add regions to META addRegionsToMeta(env, hTableDescriptor, regions); // Add replicas if needed List<HRegionInfo> newRegions = addReplicas(env, hTableDescriptor, regions); // Setup replication for region replicas if needed if (hTableDescriptor.getRegionReplication() > 1) { ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration()); } return newRegions; } return regions; }
/** * Trigger a flush in the primary region replica if this region is a secondary replica. Does not * block this thread. See RegionReplicaFlushHandler for details. */ void triggerFlushInPrimaryRegion(final HRegion region) { if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { return; } if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf)) { region.setReadsEnabled(true); return; } region.setReadsEnabled(false); // disable reads before marking the region as opened. // RegionReplicaFlushHandler might reset this. // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler this.service.submit( new RegionReplicaFlushHandler(this, clusterConnection, rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); }
@BeforeClass public static void beforeClass() throws Exception { Configuration conf = HTU.getConfiguration(); conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); conf.setInt("replication.source.size.capacity", 10240); conf.setLong("replication.source.sleepforretries", 100); conf.setInt("hbase.regionserver.maxlogs", 10); conf.setLong("hbase.master.logcleaner.ttl", 10); conf.setInt("zookeeper.recovery.retry", 1); conf.setInt("zookeeper.recovery.retry.intervalmill", 10); conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); conf.setInt("replication.stats.thread.period.seconds", 5); conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed conf.setInt("hbase.client.serverside.retries.multiplier", 1); HTU.startMiniCluster(NB_SERVERS); }
@Before public void before() throws Exception { Configuration conf = HTU.getConfiguration(); // Up the handlers; this test needs more than usual. conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true); conf.setInt("replication.stats.thread.period.seconds", 5); conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay); HTU.startMiniCluster(NB_SERVERS); htd = HTU.createTableDescriptor( name.getMethodName().substring(0, name.getMethodName().length()-3)); htd.setRegionReplication(3); HTU.getHBaseAdmin().createTable(htd); }
@Override public void setConf(Configuration conf) { conf.setIfUnset( String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_REGION_REPLICATION), String.valueOf(DEFAULT_REGION_REPLICATION)); conf.setIfUnset( String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_COLUMN_FAMILIES), StringUtils.join(",", DEFAULT_COLUMN_FAMILIES)); conf.setBoolean("hbase.table.sanity.checks", true); // enable async wal replication to region replicas for unit tests conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024L * 1024 * 4); // flush every 4 MB conf.setInt("hbase.hstore.blockingStoreFiles", 100); super.setConf(conf); }
/** * Trigger a flush in the primary region replica if this region is a secondary replica. Does not * block this thread. See RegionReplicaFlushHandler for details. */ void triggerFlushInPrimaryRegion(final HRegion region) { if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { return; } if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled( region.conf)) { region.setReadsEnabled(true); return; } region.setReadsEnabled(false); // disable reads before marking the region as opened. // RegionReplicaFlushHandler might reset this. // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler if (this.executorService != null) { this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection, rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); } }
/** * Returns the store files available for the family. * This methods performs the filtering based on the valid store files. * @param familyName Column Family Name * @return a set of {@link StoreFileInfo} for the specified family. */ public Collection<StoreFileInfo> getStoreFiles(final String familyName, final boolean validate) throws IOException { Path familyDir = getStoreDir(familyName); FileStatus[] files = FSUtils.listStatus(this.fs, familyDir); if (files == null) { if (LOG.isTraceEnabled()) { LOG.trace("No StoreFiles for: " + familyDir); } return null; } ArrayList<StoreFileInfo> storeFiles = new ArrayList<>(files.length); for (FileStatus status: files) { if (validate && !StoreFileInfo.isValid(status)) { LOG.warn("Invalid StoreFile: " + status.getPath()); continue; } StoreFileInfo info = ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo, regionInfoForFs, familyName, status.getPath()); storeFiles.add(info); } return storeFiles; }
@BeforeClass public static void beforeClass() throws Exception { Configuration conf = HTU.getConfiguration(); conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); conf.setInt("replication.source.size.capacity", 10240); conf.setLong("replication.source.sleepforretries", 100); conf.setInt("hbase.regionserver.maxlogs", 10); conf.setLong("hbase.master.logcleaner.ttl", 10); conf.setInt("zookeeper.recovery.retry", 1); conf.setInt("zookeeper.recovery.retry.intervalmill", 10); conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); conf.setInt("replication.stats.thread.period.seconds", 5); conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); HTU.startMiniCluster(NB_SERVERS); }
@Before public void before() throws Exception { Configuration conf = HTU.getConfiguration(); // Up the handlers; this test needs more than usual. conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true); conf.setInt("replication.stats.thread.period.seconds", 5); conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); HTU.startMiniCluster(NB_SERVERS); htd = HTU.createTableDescriptor( name.getMethodName().substring(0, name.getMethodName().length()-3)); htd.setRegionReplication(3); HTU.getAdmin().createTable(htd); }
@Override public void setConf(Configuration conf) { conf.setIfUnset( String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_REGION_REPLICATION), String.valueOf(DEFAULT_REGION_REPLICATION)); conf.setIfUnset( String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_COLUMN_FAMILIES), StringUtils.join(",", DEFAULT_COLUMN_FAMILIES)); conf.setBoolean("hbase.table.sanity.checks", true); // enable async wal replication to region replicas for unit tests conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024L * 1024 * 4); // flush every 4 MB conf.setInt("hbase.hstore.blockingStoreFiles", 100); super.setConf(conf); }
/** * update replica column families if necessary. * @param env MasterProcedureEnv * @throws IOException */ private void updateReplicaColumnsIfNeeded( final MasterProcedureEnv env, final HTableDescriptor oldHTableDescriptor, final HTableDescriptor newHTableDescriptor) throws IOException { final int oldReplicaCount = oldHTableDescriptor.getRegionReplication(); final int newReplicaCount = newHTableDescriptor.getRegionReplication(); if (newReplicaCount < oldReplicaCount) { Set<byte[]> tableRows = new HashSet<byte[]>(); Connection connection = env.getMasterServices().getConnection(); Scan scan = MetaTableAccessor.getScanForTableName(getTableName()); scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME)) { ResultScanner resScanner = metaTable.getScanner(scan); for (Result result : resScanner) { tableRows.add(result.getRow()); } MetaTableAccessor.removeRegionReplicasFromMeta( tableRows, newReplicaCount, oldReplicaCount - newReplicaCount, connection); } } // Setup replication for region replicas if needed if (newReplicaCount > 1 && oldReplicaCount <= 1) { ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration()); } }
/** * Create a view to the on-disk region * * @param conf the {@link Configuration} to use * @param fs {@link FileSystem} that contains the region * @param tableDir {@link Path} to where the table is being stored * @param regionInfo {@link HRegionInfo} for region */ HRegionFileSystem(final Configuration conf, final FileSystem fs, final Path tableDir, final HRegionInfo regionInfo) { this.fs = fs; this.conf = conf; this.tableDir = tableDir; this.regionInfo = regionInfo; this.regionInfoForFs = ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo); this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number", DEFAULT_HDFS_CLIENT_RETRIES_NUMBER); this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries", DEFAULT_BASE_SLEEP_BEFORE_RETRIES); }
/** * Returns the store files available for the family. This methods performs the filtering based on * the valid store files. * * @param familyName Column Family Name * @return a set of {@link StoreFileInfo} for the specified family. */ public Collection<StoreFileInfo> getStoreFiles(final String familyName, final boolean validate) throws IOException { Path familyDir = getStoreDir(familyName); // FileStatus[] files = FSUtils.listStatus(this.fs, familyDir); FileStatus[] files = FSUtils.listStatus(this.fs, familyDir, new PathFilter() { @Override public boolean accept(Path path) { String name = path.getName(); if (name.endsWith(IndexConstants.REGION_INDEX_DIR_NAME) || name .endsWith(LMDIndexConstants.BUCKET_FILE_SUFFIX) || name .endsWith(LMDIndexConstants.DATA_FILE_SUFFIX)) return false; return true; } }); if (files == null) { LOG.debug("No StoreFiles for: " + familyDir); return null; } ArrayList<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(files.length); for (FileStatus status : files) { if (validate && !StoreFileInfo.isValid(status)) { LOG.warn("Invalid StoreFile: " + status.getPath()); continue; } StoreFileInfo info = ServerRegionReplicaUtil .getStoreFileInfo(conf, fs, regionInfo, regionInfoForFs, familyName, status.getPath()); storeFiles.add(info); } return storeFiles; }
/** * Return the store file information of the specified family/file. * * @param familyName Column Family Name * @param fileName File Name * @return The {@link StoreFileInfo} for the specified family/file */ StoreFileInfo getStoreFileInfo(final String familyName, final String fileName) throws IOException { Path familyDir = getStoreDir(familyName); return ServerRegionReplicaUtil .getStoreFileInfo(conf, fs, regionInfo, regionInfoForFs, familyName, new Path(familyDir, fileName)); }
@BeforeClass public static void beforeClass() throws Exception { Configuration conf = HTU.getConfiguration(); conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false); // install WALObserver coprocessor for tests String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY); if (walCoprocs == null) { walCoprocs = WALEditCopro.class.getName(); } else { walCoprocs += "," + WALEditCopro.class.getName(); } HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, walCoprocs); HTU.startMiniCluster(NB_SERVERS); // Create table then get the single region for our new table. HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); table = HTU.createTable(htd, new byte[][]{f}, HTU.getConfiguration()); hriPrimary = table.getRegionLocation(row, false).getRegionInfo(); // mock a secondary region info to open hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(), hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1); // No master TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU); rs0 = HTU.getMiniHBaseCluster().getRegionServer(0); rs1 = HTU.getMiniHBaseCluster().getRegionServer(1); }
/** * Create a view to the on-disk region * @param conf the {@link Configuration} to use * @param fs {@link FileSystem} that contains the region * @param tableDir {@link Path} to where the table is being stored * @param regionInfo {@link HRegionInfo} for region */ HRegionFileSystem(final Configuration conf, final FileSystem fs, final Path tableDir, final HRegionInfo regionInfo) { this.fs = fs; this.conf = conf; this.tableDir = tableDir; this.regionInfo = regionInfo; this.regionInfoForFs = ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo); this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number", DEFAULT_HDFS_CLIENT_RETRIES_NUMBER); this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries", DEFAULT_BASE_SLEEP_BEFORE_RETRIES); }
/** * Returns the store files available for the family. * This methods performs the filtering based on the valid store files. * @param familyName Column Family Name * @return a set of {@link StoreFileInfo} for the specified family. */ public Collection<StoreFileInfo> getStoreFiles(final String familyName, final boolean validate) throws IOException { Path familyDir = getStoreDir(familyName); FileStatus[] files = FSUtils.listStatus(this.fs, familyDir); if (files == null) { LOG.debug("No StoreFiles for: " + familyDir); return null; } ArrayList<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(files.length); for (FileStatus status: files) { if(!status.getPath().getName().endsWith(".parquet")) { if (validate && !StoreFileInfo.isValid(status)) { LOG.warn("Invalid StoreFile: " + status.getPath()); continue; } StoreFileInfo info = ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo, regionInfoForFs, familyName, status); storeFiles.add(info); }else { //@author wangxiaoyi //TODO : restore the parquet files into memory } } return storeFiles; }
/** * Create a view to the on-disk region * @param conf the {@link Configuration} to use * @param fs {@link FileSystem} that contains the region * @param tableDir {@link Path} to where the table is being stored * @param regionInfo {@link RegionInfo} for region */ HRegionFileSystem(final Configuration conf, final FileSystem fs, final Path tableDir, final RegionInfo regionInfo) { this.fs = fs; this.conf = conf; this.tableDir = tableDir; this.regionInfo = regionInfo; this.regionInfoForFs = ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo); this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number", DEFAULT_HDFS_CLIENT_RETRIES_NUMBER); this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries", DEFAULT_BASE_SLEEP_BEFORE_RETRIES); }
@BeforeClass public static void beforeClass() throws Exception { Configuration conf = HTU.getConfiguration(); conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false); // install WALObserver coprocessor for tests String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY); if (walCoprocs == null) { walCoprocs = WALEditCopro.class.getName(); } else { walCoprocs += "," + WALEditCopro.class.getName(); } HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, walCoprocs); HTU.startMiniCluster(NB_SERVERS); // Create table then get the single region for our new table. HTableDescriptor htd = HTU.createTableDescriptor(tableName.getNameAsString()); table = HTU.createTable(htd, new byte[][]{f}, null); try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) { hriPrimary = locator.getRegionLocation(row, false).getRegionInfo(); } // mock a secondary region info to open hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(), hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1); // No master TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU); rs0 = HTU.getMiniHBaseCluster().getRegionServer(0); rs1 = HTU.getMiniHBaseCluster().getRegionServer(1); }
/** * Responsible of table creation (on-disk and META) and assignment. * - Create the table directory and descriptor (temp folder) * - Create the on-disk regions (temp folder) * [If something fails here: we've just some trash in temp] * - Move the table from temp to the root directory * [If something fails here: we've the table in place but some of the rows required * present in META. (hbck needed)] * - Add regions to META * [If something fails here: we don't have regions assigned: table disabled] * - Assign regions to Region Servers * [If something fails here: we still have the table in disabled state] * - Update ZooKeeper with the enabled state */ private void handleCreateTable(TableName tableName) throws IOException, CoordinatedStateException { Path tempdir = fileSystemManager.getTempDir(); FileSystem fs = fileSystemManager.getFileSystem(); // 1. Create Table Descriptor Path tempTableDir = FSUtils.getTableDir(tempdir, tableName); new FSTableDescriptors(this.conf).createTableDescriptorForTableDirectory( tempTableDir, this.hTableDescriptor, false); Path tableDir = FSUtils.getTableDir(fileSystemManager.getRootDir(), tableName); // 2. Create Regions List<HRegionInfo> regionInfos = handleCreateHdfsRegions(tempdir, tableName); // 3. Move Table temp directory to the hbase root location if (!fs.rename(tempTableDir, tableDir)) { throw new IOException("Unable to move table from temp=" + tempTableDir + " to hbase root=" + tableDir); } if (regionInfos != null && regionInfos.size() > 0) { // 4. Add regions to META addRegionsToMeta(regionInfos, hTableDescriptor.getRegionReplication()); // 5. Add replicas if needed regionInfos = addReplicas(hTableDescriptor, regionInfos); // 6. Setup replication for region replicas if needed if (hTableDescriptor.getRegionReplication() > 1) { ServerRegionReplicaUtil.setupRegionReplicaReplication(conf); } // 7. Trigger immediate assignment of the regions in round-robin fashion ModifyRegionUtils.assignRegions(assignmentManager, regionInfos); } // 8. Set table enabled flag up in zk. try { assignmentManager.getTableStateManager().setTableState(tableName, ZooKeeperProtos.Table.State.ENABLED); } catch (CoordinatedStateException e) { throw new IOException("Unable to ensure that " + tableName + " will be" + " enabled because of a ZooKeeper issue", e); } // 8. Update the tabledescriptor cache. ((HMaster) this.server).getTableDescriptors().get(tableName); }
private void startServiceThreads() throws IOException { // Start executor services this.service.startExecutorService(ExecutorType.RS_OPEN_REGION, conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); this.service.startExecutorService(ExecutorType.RS_OPEN_META, conf.getInt("hbase.regionserver.executor.openmeta.threads", 1)); this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION, conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); this.service.startExecutorService(ExecutorType.RS_CLOSE_META, conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); } this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt("hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, conf.getInt("hbase.regionserver.region.replica.flusher.threads", conf.getInt("hbase.regionserver.executor.openregion.threads", 3))); } Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", uncaughtExceptionHandler); this.cacheFlusher.start(uncaughtExceptionHandler); if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker); if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher); if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore); if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore); if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher); if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner); // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker", uncaughtExceptionHandler); if (this.replicationSourceHandler == this.replicationSinkHandler && this.replicationSourceHandler != null) { this.replicationSourceHandler.startReplicationService(); } else { if (this.replicationSourceHandler != null) { this.replicationSourceHandler.startReplicationService(); } if (this.replicationSinkHandler != null) { this.replicationSinkHandler.startReplicationService(); } } // Create the log splitting worker and start it // set a smaller retries to fast fail otherwise splitlogworker could be blocked for // quite a while inside HConnection layer. The worker won't be available for other // tasks even after current task is preempted after a split task times out. Configuration sinkConf = HBaseConfiguration.create(conf); sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory); splitLogWorker.start(); }
void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException { long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); int maxAttempts = getRetriesCount(connection.getConfiguration()); RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create(); if (LOG.isDebugEnabled()) { LOG.debug("Attempting to do an RPC to the primary region replica " + ServerRegionReplicaUtil .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region " + region.getRegionInfo().getEncodedName() + " to trigger a flush"); } while (!region.isClosing() && !region.isClosed() && !server.isAborted() && !server.isStopped()) { FlushRegionCallable flushCallable = new FlushRegionCallable( connection, rpcControllerFactory, RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true); // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we // do not have to wait for the whole flush here, just initiate it. FlushRegionResponse response = null; try { response = rpcRetryingCallerFactory.<FlushRegionResponse>newCaller() .callWithRetries(flushCallable, this.operationTimeout); } catch (IOException ex) { if (ex instanceof TableNotFoundException || connection.isTableDisabled(region.getRegionInfo().getTable())) { return; } throw ex; } if (response.getFlushed()) { // then we have to wait for seeing the flush entry. All reads will be rejected until we see // a complete flush cycle or replay a region open event if (LOG.isDebugEnabled()) { LOG.debug("Successfully triggered a flush of primary region replica " + ServerRegionReplicaUtil .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and blocking reads until observing a full flush cycle"); } break; } else { if (response.hasWroteFlushWalMarker()) { if(response.getWroteFlushWalMarker()) { if (LOG.isDebugEnabled()) { LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary " + "region replica " + ServerRegionReplicaUtil .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and " + "blocking reads until observing a flush marker"); } break; } else { // somehow we were not able to get the primary to write the flush request. It may be // closing or already flushing. Retry flush again after some sleep. if (!counter.shouldRetry()) { throw new IOException("Cannot cause primary to flush or drop a wal marker after " + "retries. Failing opening of this region replica " + region.getRegionInfo().getEncodedName()); } } } else { // nothing to do. Are we dealing with an old server? LOG.warn("Was not able to trigger a flush from primary region due to old server version? " + "Continuing to open the secondary region replica: " + region.getRegionInfo().getEncodedName()); region.setReadsEnabled(true); break; } } try { counter.sleepUntilNextRetry(); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } } }
void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException { checkTargetRegion(flush.getEncodedRegionName().toByteArray(), "Flush marker from WAL ", flush); if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { return; // if primary nothing to do } if (LOG.isDebugEnabled()) { LOG.debug(getRegionInfo().getEncodedName() + " : " + "Replaying flush marker " + TextFormat .shortDebugString(flush)); } startRegionOperation(Operation.REPLAY_EVENT); // use region close lock to // guard against close try { FlushAction action = flush.getAction(); switch (action) { case START_FLUSH: replayWALFlushStartMarker(flush); break; case COMMIT_FLUSH: replayWALFlushCommitMarker(flush); break; case ABORT_FLUSH: replayWALFlushAbortMarker(flush); break; case CANNOT_FLUSH: replayWALFlushCannotFlushMarker(flush, replaySeqId); break; default: LOG.warn(getRegionInfo().getEncodedName() + " : " + "Received a flush event with unknown action, ignoring. " + TextFormat .shortDebugString(flush)); break; } logRegionFiles(); } finally { closeRegionOperation(Operation.REPLAY_EVENT); } }
public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable) throws Exception { // tests having edits from a disabled or dropped table is handled correctly by skipping those // entries and further edits after the edits from dropped/disabled table can be replicated // without problems. TableName tableName = TableName.valueOf("testRegionReplicaReplicationIgnoresDisabledTables" + dropTable); HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); int regionReplication = 3; htd.setRegionReplication(regionReplication); HTU.deleteTableIfAny(tableName); HTU.getHBaseAdmin().createTable(htd); TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable"); HTU.deleteTableIfAny(toBeDisabledTable); htd = HTU.createTableDescriptor(toBeDisabledTable.toString()); htd.setRegionReplication(regionReplication); HTU.getHBaseAdmin().createTable(htd); // both tables are created, now pause replication ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); // now that the replication is disabled, write to the table to be dropped, then drop the table. Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); Table table = connection.getTable(tableName); Table tableToBeDisabled = connection.getTable(toBeDisabledTable); HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000); AtomicLong skippedEdits = new AtomicLong(); RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE); RegionLocator rl = connection.getRegionLocator(toBeDisabledTable); HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); Entry entry = new Entry( new WALKey(encodedRegionName, toBeDisabledTable, 1), new WALEdit()); HTU.getHBaseAdmin().disableTable(toBeDisabledTable); // disable the table if (dropTable) { HTU.getHBaseAdmin().deleteTable(toBeDisabledTable); } sinkWriter.append(toBeDisabledTable, encodedRegionName, HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry)); assertEquals(2, skippedEdits.get()); try { // load some data to the to-be-dropped table // load the data to the table HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); // now enable the replication admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); verifyReplication(tableName, regionReplication, 0, 1000); } finally { admin.close(); table.close(); rl.close(); tableToBeDisabled.close(); HTU.deleteTableIfAny(toBeDisabledTable); connection.close(); } }
private long initializeRegionInternals(final CancelableProgressable reporter, final MonitoredTask status) throws IOException, UnsupportedEncodingException { if (coprocessorHost != null) { status.setStatus("Running coprocessor pre-open hook"); coprocessorHost.preOpen(); } // Write HRI to a file in case we need to recover hbase:meta status.setStatus("Writing region info on filesystem"); fs.checkRegionInfoOnFilesystem(); // Initialize all the HStores status.setStatus("Initializing all the Stores"); long maxSeqId = initializeRegionStores(reporter, status); this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this)); this.writestate.flushRequested = false; this.writestate.compacting = 0; if (this.writestate.writesEnabled) { // Remove temporary data left over from old regions status.setStatus("Cleaning up temporary data from old regions"); fs.cleanupTempDir(); } if (this.writestate.writesEnabled) { status.setStatus("Cleaning up detritus from prior splits"); // Get rid of any splits or merges that were lost in-progress. Clean out // these directories here on open. We may be opening a region that was // being split but we crashed in the middle of it all. fs.cleanupAnySplitDetritus(); fs.cleanupMergesDir(); } // Initialize split policy this.splitPolicy = RegionSplitPolicy.create(this, conf); this.lastFlushTime = EnvironmentEdgeManager.currentTime(); // Use maximum of wal sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). long nextSeqid = maxSeqId; // In distributedLogReplay mode, we don't know the last change sequence number because region // is opened before recovery completes. So we add a safety bumper to avoid new sequence number // overlaps used sequence numbers if (this.writestate.writesEnabled) { nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs .getRegionDir(), nextSeqid, (this.isRecovering ? (this.flushPerChanges + 10000000) : 1)); } else { nextSeqid++; } LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() + "; next sequenceid=" + nextSeqid); // A region can be reopened if failed a split; reset flags this.closing.set(false); this.closed.set(false); if (coprocessorHost != null) { status.setStatus("Running coprocessor post-open hooks"); coprocessorHost.postOpen(); } status.markComplete("Region opened successfully"); return nextSeqid; }
void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException { checkTargetRegion(flush.getEncodedRegionName().toByteArray(), "Flush marker from WAL ", flush); if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { return; // if primary nothing to do } if (LOG.isDebugEnabled()) { LOG.debug(getRegionInfo().getEncodedName() + " : " + "Replaying flush marker " + TextFormat.shortDebugString(flush)); } startRegionOperation(Operation.REPLAY_EVENT); // use region close lock to guard against close try { FlushAction action = flush.getAction(); switch (action) { case START_FLUSH: replayWALFlushStartMarker(flush); break; case COMMIT_FLUSH: replayWALFlushCommitMarker(flush); break; case ABORT_FLUSH: replayWALFlushAbortMarker(flush); break; case CANNOT_FLUSH: replayWALFlushCannotFlushMarker(flush, replaySeqId); break; default: LOG.warn(getRegionInfo().getEncodedName() + " : " + "Received a flush event with unknown action, ignoring. " + TextFormat.shortDebugString(flush)); break; } logRegionFiles(); } finally { closeRegionOperation(Operation.REPLAY_EVENT); } }
public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable) throws Exception { // tests having edits from a disabled or dropped table is handled correctly by skipping those // entries and further edits after the edits from dropped/disabled table can be replicated // without problems. final TableName tableName = TableName.valueOf(name.getMethodName() + dropTable); HTableDescriptor htd = HTU.createTableDescriptor(tableName); int regionReplication = 3; htd.setRegionReplication(regionReplication); HTU.deleteTableIfAny(tableName); HTU.getAdmin().createTable(htd); TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable"); HTU.deleteTableIfAny(toBeDisabledTable); htd = HTU.createTableDescriptor(toBeDisabledTable.toString()); htd.setRegionReplication(regionReplication); HTU.getAdmin().createTable(htd); // both tables are created, now pause replication ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); // now that the replication is disabled, write to the table to be dropped, then drop the table. Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); Table table = connection.getTable(tableName); Table tableToBeDisabled = connection.getTable(toBeDisabledTable); HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000); AtomicLong skippedEdits = new AtomicLong(); RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE); RegionLocator rl = connection.getRegionLocator(toBeDisabledTable); HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); Entry entry = new Entry( new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1), new WALEdit()); HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table if (dropTable) { HTU.getAdmin().deleteTable(toBeDisabledTable); } sinkWriter.append(toBeDisabledTable, encodedRegionName, HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry)); assertEquals(2, skippedEdits.get()); try { // load some data to the to-be-dropped table // load the data to the table HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); // now enable the replication admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); verifyReplication(tableName, regionReplication, 0, 1000); } finally { admin.close(); table.close(); rl.close(); tableToBeDisabled.close(); HTU.deleteTableIfAny(toBeDisabledTable); connection.close(); } }
/** * Return the store file information of the specified family/file. * * @param familyName Column Family Name * @param fileName File Name * @return The {@link StoreFileInfo} for the specified family/file */ StoreFileInfo getStoreFileInfo(final String familyName, final String fileName) throws IOException { Path familyDir = getStoreDir(familyName); return ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo, regionInfoForFs, familyName, new Path(familyDir, fileName)); }