/** * Start archiving table for given hfile cleaner * @param tableName table to archive * @param cleaner cleaner to check to make sure change propagated * @return underlying {@link LongTermArchivingHFileCleaner} that is managing archiving * @throws IOException on failure * @throws KeeperException on failure */ private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner) throws IOException, KeeperException { // turn on hfile retention LOG.debug("----Starting archiving for table:" + tableName); archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName)); assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(tableName)); // wait for the archiver to get the notification List<BaseHFileCleanerDelegate> cleaners = cleaner.getDelegatesForTesting(); LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) { // spin until propagation - should be fast } return cleaners; }
@Override public void setConf(Configuration config) { // If either replication or replication of bulk load hfiles is disabled, keep all members null if (!(config.getBoolean( HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT))) { LOG.warn(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY + " is not enabled. Better to remove " + ReplicationHFileCleaner.class + " from " + HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS + " configuration."); return; } // Make my own Configuration. Then I'll have my own connection to zk that // I can close myself when time comes. Configuration conf = new Configuration(config); try { setConf(conf, new ZKWatcher(conf, "replicationHFileCleaner", null)); } catch (IOException e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } }
/** * This method modifies the master's configuration in order to inject replication-related features */ @VisibleForTesting public static void decorateMasterConfiguration(Configuration conf) { String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS); String cleanerClass = ReplicationLogCleaner.class.getCanonicalName(); if (!plugins.contains(cleanerClass)) { conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass); } if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) { plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS); cleanerClass = ReplicationHFileCleaner.class.getCanonicalName(); if (!plugins.contains(cleanerClass)) { conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass); } } }
private void startServiceThreads() throws IOException{ // Start the executor service pools this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt("hbase.master.executor.openregion.threads", 5)); this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt("hbase.master.executor.closeregion.threads", 5)); this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.serverops.threads", 5)); this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.serverops.threads", 5)); this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt("hbase.master.executor.logreplayops.threads", 10)); // We depend on there being only one instance of this executor running // at a time. To do concurrency, would need fencing of enable/disable of // tables. // Any time changing this maxThreads to > 1, pls see the comment at // AccessController#postCreateTableHandler this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); startProcedureExecutor(); // Start log cleaner thread int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000); this.logCleaner = new LogCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(), getMasterFileSystem().getOldLogDir()); getChoreService().scheduleChore(logCleaner); //start the hfile archive cleaner thread Path archiveDir = HFileArchiveUtil.getArchivePath(conf); this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem() .getFileSystem(), archiveDir); getChoreService().scheduleChore(hfileCleaner); serviceStarted = true; if (LOG.isTraceEnabled()) { LOG.trace("Started service threads"); } }
@Test (timeout=300000) public void testArchivingOnSingleTable() throws Exception { createArchiveDirectory(); FileSystem fs = UTIL.getTestFileSystem(); Path archiveDir = getArchiveDir(); Path tableDir = getTableDir(STRING_TABLE_NAME); toCleanup.add(archiveDir); toCleanup.add(tableDir); Configuration conf = UTIL.getConfiguration(); // setup the delegate Stoppable stop = new StoppableImplementation(); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); // create the region HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM); Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); loadFlushAndCompact(region, TEST_FAM); // get the current hfiles in the archive directory List<Path> files = getAllFiles(fs, archiveDir); if (files == null) { FSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG); throw new RuntimeException("Didn't archive any files!"); } CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size()); runCleaner(cleaner, finished, stop); // know the cleaner ran, so now check all the files again to make sure they are still there List<Path> archivedFiles = getAllFiles(fs, archiveDir); assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles); // but we still have the archive directory assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()))); }
/** * @param cleaner */ private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop) throws InterruptedException { final ChoreService choreService = new ChoreService("CLEANER_SERVER_NAME"); // run the cleaner choreService.scheduleChore(cleaner); // wait for the cleaner to check all the files finished.await(); // stop the cleaner stop.stop(""); }
@Test (timeout=300000) public void testArchivingOnSingleTable() throws Exception { createArchiveDirectory(); FileSystem fs = UTIL.getTestFileSystem(); Path archiveDir = getArchiveDir(); Path tableDir = getTableDir(STRING_TABLE_NAME); toCleanup.add(archiveDir); toCleanup.add(tableDir); Configuration conf = UTIL.getConfiguration(); // setup the delegate Stoppable stop = new StoppableImplementation(); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); // create the region HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM); HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); loadFlushAndCompact(region, TEST_FAM); // get the current hfiles in the archive directory List<Path> files = getAllFiles(fs, archiveDir); if (files == null) { FSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG); throw new RuntimeException("Didn't archive any files!"); } CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size()); runCleaner(cleaner, finished, stop); // know the cleaner ran, so now check all the files again to make sure they are still there List<Path> archivedFiles = getAllFiles(fs, archiveDir); assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles); // but we still have the archive directory assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()))); }
/** * @param cleaner */ private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop) throws InterruptedException { // run the cleaner cleaner.start(); // wait for the cleaner to check all the files finished.await(); // stop the cleaner stop.stop(""); }
private void startServiceThreads() throws IOException{ // Start the executor service pools this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt("hbase.master.executor.openregion.threads", 5)); this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt("hbase.master.executor.closeregion.threads", 5)); this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.serverops.threads", 5)); this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.serverops.threads", 5)); this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt("hbase.master.executor.logreplayops.threads", 10)); // We depend on there being only one instance of this executor running // at a time. To do concurrency, would need fencing of enable/disable of // tables. this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); // Start log cleaner thread int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000); this.logCleaner = new LogCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(), getMasterFileSystem().getOldLogDir()); Threads.setDaemonThreadRunning(logCleaner.getThread(), getName() + ".oldLogCleaner"); //start the hfile archive cleaner thread Path archiveDir = HFileArchiveUtil.getArchivePath(conf); this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem() .getFileSystem(), archiveDir); Threads.setDaemonThreadRunning(hfileCleaner.getThread(), getName() + ".archivedHFileCleaner"); serviceStarted = true; if (LOG.isTraceEnabled()) { LOG.trace("Started service threads"); } }
@Test public void testArchivingOnSingleTable() throws Exception { createArchiveDirectory(); FileSystem fs = UTIL.getTestFileSystem(); Path archiveDir = getArchiveDir(); Path tableDir = getTableDir(STRING_TABLE_NAME); toCleanup.add(archiveDir); toCleanup.add(tableDir); Configuration conf = UTIL.getConfiguration(); // setup the delegate Stoppable stop = new StoppableImplementation(); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); // create the region HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM); HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); loadFlushAndCompact(region, TEST_FAM); // get the current hfiles in the archive directory List<Path> files = getAllFiles(fs, archiveDir); if (files == null) { FSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG); throw new RuntimeException("Didn't archive any files!"); } CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size()); runCleaner(cleaner, finished, stop); // know the cleaner ran, so now check all the files again to make sure they are still there List<Path> archivedFiles = getAllFiles(fs, archiveDir); assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles); // but we still have the archive directory assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()))); }
public HFileCleaner getHFileCleaner() { return this.hfileCleaner; }
/** * Called at startup, to verify if snapshot operation is supported, and to avoid * starting the master if there're snapshots present but the cleaners needed are missing. * Otherwise we can end up with snapshot data loss. * @param conf The {@link Configuration} object to use * @param mfs The MasterFileSystem to use * @throws IOException in case of file-system operation failure * @throws UnsupportedOperationException in case cleaners are missing and * there're snapshot in the system */ private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs) throws IOException, UnsupportedOperationException { // Verify if snapshot is disabled by the user String enabled = conf.get(HBASE_SNAPSHOT_ENABLED); boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false); boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled); // Extract cleaners from conf Set<String> hfileCleaners = new HashSet<String>(); String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS); if (cleaners != null) Collections.addAll(hfileCleaners, cleaners); Set<String> logCleaners = new HashSet<String>(); cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS); if (cleaners != null) Collections.addAll(logCleaners, cleaners); // check if an older version of snapshot directory was present Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME); FileSystem fs = mfs.getFileSystem(); List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir)); if (ss != null && !ss.isEmpty()) { LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir); LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME); } // If the user has enabled the snapshot, we force the cleaners to be present // otherwise we still need to check if cleaners are enabled or not and verify // that there're no snapshot in the .snapshot folder. if (snapshotEnabled) { // Inject snapshot cleaners, if snapshot.enable is true hfileCleaners.add(SnapshotHFileCleaner.class.getName()); hfileCleaners.add(HFileLinkCleaner.class.getName()); logCleaners.add(SnapshotLogCleaner.class.getName()); // Set cleaners conf conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, hfileCleaners.toArray(new String[hfileCleaners.size()])); conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, logCleaners.toArray(new String[logCleaners.size()])); } else { // Verify if cleaners are present snapshotEnabled = logCleaners.contains(SnapshotLogCleaner.class.getName()) && hfileCleaners.contains(SnapshotHFileCleaner.class.getName()) && hfileCleaners.contains(HFileLinkCleaner.class.getName()); // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set. if (snapshotEnabled) { LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " + "but the '" + HBASE_SNAPSHOT_ENABLED + "' property " + (userDisabled ? "is set to 'false'." : "is not set.")); } } // Mark snapshot feature as enabled if cleaners are present and user has not disabled it. this.isSnapshotSupported = snapshotEnabled && !userDisabled; // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder // otherwise we end up with snapshot data loss. if (!snapshotEnabled) { LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners."); Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir()); if (fs.exists(snapshotDir)) { FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir, new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs)); if (snapshots != null) { LOG.error("Snapshots are present, but cleaners are not enabled."); checkSnapshotSupport(); } } } }
private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir, Stoppable stop) { conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, LongTermArchivingHFileCleaner.class.getCanonicalName()); return new HFileCleaner(1000, stop, conf, fs, archiveDir); }
/** * Test HFileArchiver.resolveAndArchive() race condition HBASE-7643 */ @Test public void testCleaningRace() throws Exception { final long TEST_TIME = 20 * 1000; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration(); Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace"); FileSystem fs = UTIL.getTestFileSystem(); Path archiveDir = new Path(rootDir, HConstants.HFILE_ARCHIVE_DIRECTORY); Path regionDir = new Path(FSUtils.getTableDir(new Path("./"), TableName.valueOf("table")), "abcdef"); Path familyDir = new Path(regionDir, "cf"); Path sourceRegionDir = new Path(rootDir, regionDir); fs.mkdirs(sourceRegionDir); Stoppable stoppable = new StoppableImplementation(); // The cleaner should be looping without long pauses to reproduce the race condition. HFileCleaner cleaner = new HFileCleaner(1, stoppable, conf, fs, archiveDir); try { choreService.scheduleChore(cleaner); // Keep creating/archiving new files while the cleaner is running in the other thread long startTime = System.currentTimeMillis(); for (long fid = 0; (System.currentTimeMillis() - startTime) < TEST_TIME; ++fid) { Path file = new Path(familyDir, String.valueOf(fid)); Path sourceFile = new Path(rootDir, file); Path archiveFile = new Path(archiveDir, file); fs.createNewFile(sourceFile); try { // Try to archive the file HFileArchiver.archiveRegion(fs, rootDir, sourceRegionDir.getParent(), sourceRegionDir); // The archiver succeded, the file is no longer in the original location // but it's in the archive location. LOG.debug("hfile=" + fid + " should be in the archive"); assertTrue(fs.exists(archiveFile)); assertFalse(fs.exists(sourceFile)); } catch (IOException e) { // The archiver is unable to archive the file. Probably HBASE-7643 race condition. // in this case, the file should not be archived, and we should have the file // in the original location. LOG.debug("hfile=" + fid + " should be in the source location"); assertFalse(fs.exists(archiveFile)); assertTrue(fs.exists(sourceFile)); // Avoid to have this file in the next run fs.delete(sourceFile, false); } } } finally { stoppable.stop("test end"); cleaner.cancel(true); choreService.shutdown(); fs.delete(rootDir, true); } }
private void startServiceThreads() throws IOException{ // Start the executor service pools this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt("hbase.master.executor.openregion.threads", 5)); this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt("hbase.master.executor.closeregion.threads", 5)); this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.serverops.threads", 3)); this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.serverops.threads", 5)); // We depend on there being only one instance of this executor running // at a time. To do concurrency, would need fencing of enable/disable of // tables. this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); // Start log cleaner thread String n = Thread.currentThread().getName(); int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000); this.logCleaner = new LogCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(), getMasterFileSystem().getOldLogDir()); Threads.setDaemonThreadRunning(logCleaner.getThread(), n + ".oldLogCleaner"); //start the hfile archive cleaner thread Path archiveDir = HFileArchiveUtil.getArchivePath(conf); this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem() .getFileSystem(), archiveDir); Threads.setDaemonThreadRunning(hfileCleaner.getThread(), n + ".archivedHFileCleaner"); // Start the health checker if (this.healthCheckChore != null) { Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker"); } // Start allowing requests to happen. this.rpcServer.openServer(); if (LOG.isDebugEnabled()) { LOG.debug("Started service threads"); } }
/** * Verify the snapshot support based on the configuration. */ @Test public void testSnapshotSupportConfiguration() throws Exception { // No configuration (no cleaners, not enabled): snapshot feature disabled Configuration conf = new Configuration(); SnapshotManager manager = getNewManager(conf); assertFalse("Snapshot should be disabled with no configuration", isSnapshotSupported(manager)); // force snapshot feature to be enabled conf = new Configuration(); conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); manager = getNewManager(conf); assertTrue("Snapshot should be enabled", isSnapshotSupported(manager)); // force snapshot feature to be disabled conf = new Configuration(); conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, false); manager = getNewManager(conf); assertFalse("Snapshot should be disabled", isSnapshotSupported(manager)); // force snapshot feature to be disabled, even if cleaners are present conf = new Configuration(); conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, SnapshotHFileCleaner.class.getName(), HFileLinkCleaner.class.getName()); conf.set(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, SnapshotLogCleaner.class.getName()); conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, false); manager = getNewManager(conf); assertFalse("Snapshot should be disabled", isSnapshotSupported(manager)); // cleaners are present, but missing snapshot enabled property conf = new Configuration(); conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, SnapshotHFileCleaner.class.getName(), HFileLinkCleaner.class.getName()); conf.set(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, SnapshotLogCleaner.class.getName()); manager = getNewManager(conf); assertTrue("Snapshot should be enabled, because cleaners are present", isSnapshotSupported(manager)); // Create a "test snapshot" Path rootDir = UTIL.getDataTestDir(); Path testSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir( "testSnapshotSupportConfiguration", rootDir); fs.mkdirs(testSnapshotDir); try { // force snapshot feature to be disabled, but snapshots are present conf = new Configuration(); conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, false); manager = getNewManager(conf); fail("Master should not start when snapshot is disabled, but snapshots are present"); } catch (UnsupportedOperationException e) { // expected } finally { fs.delete(testSnapshotDir, true); } }
/** * Test HFileArchiver.resolveAndArchive() race condition HBASE-7643 */ @Test public void testCleaningRace() throws Exception { final long TEST_TIME = 20 * 1000; Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration(); Path rootDir = UTIL.getDataTestDir("testCleaningRace"); FileSystem fs = UTIL.getTestFileSystem(); Path archiveDir = new Path(rootDir, HConstants.HFILE_ARCHIVE_DIRECTORY); Path regionDir = new Path("table", "abcdef"); Path familyDir = new Path(regionDir, "cf"); Path sourceRegionDir = new Path(rootDir, regionDir); fs.mkdirs(sourceRegionDir); Stoppable stoppable = new StoppableImplementation(); // The cleaner should be looping without long pauses to reproduce the race condition. HFileCleaner cleaner = new HFileCleaner(1, stoppable, conf, fs, archiveDir); try { cleaner.start(); // Keep creating/archiving new files while the cleaner is running in the other thread long startTime = System.currentTimeMillis(); for (long fid = 0; (System.currentTimeMillis() - startTime) < TEST_TIME; ++fid) { Path file = new Path(familyDir, String.valueOf(fid)); Path sourceFile = new Path(rootDir, file); Path archiveFile = new Path(archiveDir, file); fs.createNewFile(sourceFile); try { // Try to archive the file HFileArchiver.archiveRegion(fs, rootDir, sourceRegionDir.getParent(), sourceRegionDir); // The archiver succeded, the file is no longer in the original location // but it's in the archive location. LOG.debug("hfile=" + fid + " should be in the archive"); assertTrue(fs.exists(archiveFile)); assertFalse(fs.exists(sourceFile)); } catch (IOException e) { // The archiver is unable to archive the file. Probably HBASE-7643 race condition. // in this case, the file should not be archived, and we should have the file // in the original location. LOG.debug("hfile=" + fid + " should be in the source location"); assertFalse(fs.exists(archiveFile)); assertTrue(fs.exists(sourceFile)); // Avoid to have this file in the next run fs.delete(sourceFile, false); } } } finally { stoppable.stop("test end"); cleaner.join(); fs.delete(rootDir, true); } }
private void startServiceThreads() throws IOException { // Start the executor service pools this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt("hbase.master.executor.openregion.threads", 5)); this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt("hbase.master.executor.closeregion.threads", 5)); this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.serverops.threads", 5)); this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.serverops.threads", 5)); this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt("hbase.master.executor.logreplayops.threads", 10)); // We depend on there being only one instance of this executor running // at a time. To do concurrency, would need fencing of enable/disable of // tables. // Any time changing this maxThreads to > 1, pls see the comment at // AccessController#postCreateTableHandler this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); // Start log cleaner thread int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000); this.logCleaner = new LogCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(), getMasterFileSystem().getOldLogDir()); Threads.setDaemonThreadRunning(logCleaner.getThread(), getServerName().toShortString() + ".oldLogCleaner"); //start the hfile archive cleaner thread Path archiveDir = HFileArchiveUtil.getArchivePath(conf); this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem() .getFileSystem(), archiveDir); Threads.setDaemonThreadRunning(hfileCleaner.getThread(), getServerName().toShortString() + ".archivedHFileCleaner"); serviceStarted = true; if (LOG.isTraceEnabled()) { LOG.trace("Started service threads"); } }
/** * Test HFileArchiver.resolveAndArchive() race condition HBASE-7643 */ @Test public void testCleaningRace() throws Exception { final long TEST_TIME = 20 * 1000; Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration(); Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace"); FileSystem fs = UTIL.getTestFileSystem(); Path archiveDir = new Path(rootDir, HConstants.HFILE_ARCHIVE_DIRECTORY); Path regionDir = new Path(FSUtils.getTableDir(new Path("./"), TableName.valueOf("table")), "abcdef"); Path familyDir = new Path(regionDir, "cf"); Path sourceRegionDir = new Path(rootDir, regionDir); fs.mkdirs(sourceRegionDir); Stoppable stoppable = new StoppableImplementation(); // The cleaner should be looping without long pauses to reproduce the race condition. HFileCleaner cleaner = new HFileCleaner(1, stoppable, conf, fs, archiveDir); try { cleaner.start(); // Keep creating/archiving new files while the cleaner is running in the other thread long startTime = System.currentTimeMillis(); for (long fid = 0; (System.currentTimeMillis() - startTime) < TEST_TIME; ++fid) { Path file = new Path(familyDir, String.valueOf(fid)); Path sourceFile = new Path(rootDir, file); Path archiveFile = new Path(archiveDir, file); fs.createNewFile(sourceFile); try { // Try to archive the file HFileArchiver.archiveRegion(fs, rootDir, sourceRegionDir.getParent(), sourceRegionDir); // The archiver succeded, the file is no longer in the original location // but it's in the archive location. LOG.debug("hfile=" + fid + " should be in the archive"); assertTrue(fs.exists(archiveFile)); assertFalse(fs.exists(sourceFile)); } catch (IOException e) { // The archiver is unable to archive the file. Probably HBASE-7643 race condition. // in this case, the file should not be archived, and we should have the file // in the original location. LOG.debug("hfile=" + fid + " should be in the source location"); assertFalse(fs.exists(archiveFile)); assertTrue(fs.exists(sourceFile)); // Avoid to have this file in the next run fs.delete(sourceFile, false); } } } finally { stoppable.stop("test end"); cleaner.join(); fs.delete(rootDir, true); } }
void startServiceThreads() throws IOException{ // Start the executor service pools this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt("hbase.master.executor.openregion.threads", 5)); this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt("hbase.master.executor.closeregion.threads", 5)); this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.serverops.threads", 5)); this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.serverops.threads", 5)); this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt("hbase.master.executor.logreplayops.threads", 10)); // We depend on there being only one instance of this executor running // at a time. To do concurrency, would need fencing of enable/disable of // tables. // Any time changing this maxThreads to > 1, pls see the comment at // AccessController#postCreateTableHandler this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); // Start log cleaner thread String n = Thread.currentThread().getName(); int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000); this.logCleaner = new LogCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(), getMasterFileSystem().getOldLogDir()); Threads.setDaemonThreadRunning(logCleaner.getThread(), n + ".oldLogCleaner"); //start the hfile archive cleaner thread Path archiveDir = HFileArchiveUtil.getArchivePath(conf); this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem() .getFileSystem(), archiveDir); Threads.setDaemonThreadRunning(hfileCleaner.getThread(), n + ".archivedHFileCleaner"); // Start the health checker if (this.healthCheckChore != null) { Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker"); } // Start allowing requests to happen. this.rpcServer.openServer(); this.rpcServerOpen = true; if (LOG.isTraceEnabled()) { LOG.trace("Started service threads"); } }