private long testCompactionWithoutThroughputLimit() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, NoLimitCompactionThroughputController.class.getName()); TEST_UTIL.startMiniCluster(1); try { Store store = prepareData(); assertEquals(10, store.getStorefilesCount()); long startTime = System.currentTimeMillis(); TEST_UTIL.getHBaseAdmin().majorCompact(tableName); while (store.getStorefilesCount() != 1) { Thread.sleep(20); } return System.currentTimeMillis() - startTime; } finally { TEST_UTIL.shutdownMiniCluster(); } }
private HStore prepareData() throws IOException { Admin admin = TEST_UTIL.getAdmin(); TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) .setValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, FIFOCompactionPolicy.class.getName()) .setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, DisabledRegionSplitPolicy.class.getName()) .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(1).build()) .build(); admin.createTable(desc); Table table = TEST_UTIL.getConnection().getTable(tableName); TimeOffsetEnvironmentEdge edge = (TimeOffsetEnvironmentEdge) EnvironmentEdgeManager.getDelegate(); for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { byte[] value = new byte[128 * 1024]; ThreadLocalRandom.current().nextBytes(value); table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); } admin.flush(tableName); edge.increment(1001); } return getStoreWithName(tableName); }
@Test public void testSanityCheckBlockingStoreFiles() throws IOException { error.expect(DoNotRetryIOException.class); error.expectMessage("Blocking file count 'hbase.hstore.blockingStoreFiles'"); error.expectMessage("is below recommended minimum of 1000 for column family"); TableName tableName = TableName.valueOf(getClass().getSimpleName() + "-BlockingStoreFiles"); TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) .setValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, FIFOCompactionPolicy.class.getName()) .setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, DisabledRegionSplitPolicy.class.getName()) .setValue(HStore.BLOCKING_STOREFILES_KEY, "10") .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(1).build()) .build(); TEST_UTIL.getAdmin().createTable(desc); }
private long testCompactionWithoutThroughputLimit() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, NoLimitThroughputController.class.getName()); TEST_UTIL.startMiniCluster(1); try { HStore store = prepareData(); assertEquals(10, store.getStorefilesCount()); long startTime = System.currentTimeMillis(); TEST_UTIL.getAdmin().majorCompact(tableName); while (store.getStorefilesCount() != 1) { Thread.sleep(20); } return System.currentTimeMillis() - startTime; } finally { TEST_UTIL.shutdownMiniCluster(); } }
private Store prepareData() throws IOException { HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.setConfiguration(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, FIFOCompactionPolicy.class.getName()); desc.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, DisabledRegionSplitPolicy.class.getName()); HColumnDescriptor colDesc = new HColumnDescriptor(family); colDesc.setTimeToLive(1); // 1 sec desc.addFamily(colDesc); admin.createTable(desc); Table table = TEST_UTIL.getConnection().getTable(tableName); Random rand = new Random(); TimeOffsetEnvironmentEdge edge = (TimeOffsetEnvironmentEdge) EnvironmentEdgeManager.getDelegate(); for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { byte[] value = new byte[128 * 1024]; rand.nextBytes(value); table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); } admin.flush(tableName); edge.increment(1001); } return getStoreWithName(tableName); }
@Test public void testSanityCheckTTL() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); TEST_UTIL.startMiniCluster(1); HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); String tableName = this.tableName.getNameAsString()+"-TTL"; if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.setConfiguration(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, FIFOCompactionPolicy.class.getName()); desc.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, DisabledRegionSplitPolicy.class.getName()); HColumnDescriptor colDesc = new HColumnDescriptor(family); desc.addFamily(colDesc); try{ admin.createTable(desc); Assert.fail(); }catch(Exception e){ }finally{ TEST_UTIL.shutdownMiniCluster(); } }
@Test public void testSanityCheckMinVersion() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); TEST_UTIL.startMiniCluster(1); HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); String tableName = this.tableName.getNameAsString()+"-MinVersion"; if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.setConfiguration(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, FIFOCompactionPolicy.class.getName()); desc.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, DisabledRegionSplitPolicy.class.getName()); HColumnDescriptor colDesc = new HColumnDescriptor(family); colDesc.setTimeToLive(1); // 1 sec colDesc.setMinVersions(1); desc.addFamily(colDesc); try{ admin.createTable(desc); Assert.fail(); }catch(Exception e){ }finally{ TEST_UTIL.shutdownMiniCluster(); } }
@Test public void testSanityCheckBlockingStoreFiles() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10); TEST_UTIL.startMiniCluster(1); HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); String tableName = this.tableName.getNameAsString()+"-MinVersion"; if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor desc = new HTableDescriptor(tableName); desc.setConfiguration(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, FIFOCompactionPolicy.class.getName()); desc.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, DisabledRegionSplitPolicy.class.getName()); HColumnDescriptor colDesc = new HColumnDescriptor(family); colDesc.setTimeToLive(1); // 1 sec desc.addFamily(colDesc); try{ admin.createTable(desc); Assert.fail(); }catch(Exception e){ }finally{ TEST_UTIL.shutdownMiniCluster(); } }
private long testCompactionWithThroughputLimit() throws Exception { long throughputLimit = 1024L * 1024; Configuration conf = TEST_UTIL.getConfiguration(); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); conf.setLong( PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, throughputLimit); conf.setLong( PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, throughputLimit); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, PressureAwareCompactionThroughputController.class.getName()); TEST_UTIL.startMiniCluster(1); try { Store store = prepareData(); assertEquals(10, store.getStorefilesCount()); long startTime = System.currentTimeMillis(); TEST_UTIL.getHBaseAdmin().majorCompact(tableName); while (store.getStorefilesCount() != 1) { Thread.sleep(20); } long duration = System.currentTimeMillis() - startTime; double throughput = (double) store.getStorefilesSize() / duration * 1000; // confirm that the speed limit work properly(not too fast, and also not too slow) // 20% is the max acceptable error rate. assertTrue(throughput < throughputLimit * 1.2); assertTrue(throughput > throughputLimit * 0.8); return System.currentTimeMillis() - startTime; } finally { TEST_UTIL.shutdownMiniCluster(); } }
@Test public void testSanityCheckTTL() throws IOException { error.expect(DoNotRetryIOException.class); error.expectMessage("Default TTL is not supported"); TableName tableName = TableName.valueOf(getClass().getSimpleName() + "-TTL"); TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) .setValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, FIFOCompactionPolicy.class.getName()) .setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, DisabledRegionSplitPolicy.class.getName()) .addColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); TEST_UTIL.getAdmin().createTable(desc); }
@Test public void testSanityCheckMinVersion() throws IOException { error.expect(DoNotRetryIOException.class); error.expectMessage("MIN_VERSION > 0 is not supported for FIFO compaction"); TableName tableName = TableName.valueOf(getClass().getSimpleName() + "-MinVersion"); TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) .setValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, FIFOCompactionPolicy.class.getName()) .setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, DisabledRegionSplitPolicy.class.getName()) .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(1) .setMinVersions(1).build()) .build(); TEST_UTIL.getAdmin().createTable(desc); }
private long testCompactionWithThroughputLimit() throws Exception { long throughputLimit = 1024L * 1024; Configuration conf = TEST_UTIL.getConfiguration(); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); conf.setLong( PressureAwareCompactionThroughputController .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, throughputLimit); conf.setLong( PressureAwareCompactionThroughputController .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, throughputLimit); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, PressureAwareCompactionThroughputController.class.getName()); TEST_UTIL.startMiniCluster(1); try { HStore store = prepareData(); assertEquals(10, store.getStorefilesCount()); long startTime = System.currentTimeMillis(); TEST_UTIL.getAdmin().majorCompact(tableName); while (store.getStorefilesCount() != 1) { Thread.sleep(20); } long duration = System.currentTimeMillis() - startTime; double throughput = (double) store.getStorefilesSize() / duration * 1000; // confirm that the speed limit work properly(not too fast, and also not too slow) // 20% is the max acceptable error rate. assertTrue(throughput < throughputLimit * 1.2); assertTrue(throughput > throughputLimit * 0.8); return System.currentTimeMillis() - startTime; } finally { TEST_UTIL.shutdownMiniCluster(); } }
@BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); TEST_UTIL.getConfiguration().setClass("hbase.hregion.impl", HMobRegion.class, HRegion.class); TEST_UTIL.getConfiguration().setClass(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, DefaultMobStoreFlusher.class, DefaultStoreFlusher.class); TEST_UTIL.getConfiguration().setClass("hbase.coprocessor.master.classes", MobMasterObserver.class, BaseMasterObserver.class); TEST_UTIL.startMiniCluster(1); }
@BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); TEST_UTIL.getConfiguration().setClass("hbase.hregion.impl", HMobRegion.class, HRegion.class); TEST_UTIL.getConfiguration().setClass(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, DefaultMobStoreFlusher.class, DefaultStoreFlusher.class); TEST_UTIL.startMiniCluster(); TEST_UTIL.startMiniMapReduceCluster(); }
@BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); TEST_UTIL.getConfiguration().setClass("hbase.hregion.impl", HMobRegion.class, HRegion.class); TEST_UTIL.getConfiguration().setClass(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, DefaultMobStoreFlusher.class, DefaultStoreFlusher.class); TEST_UTIL.startMiniCluster(1); }
@BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0); TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true); TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); TEST_UTIL.getConfiguration().setClass("hbase.hregion.impl", HMobRegion.class, HRegion.class); TEST_UTIL.getConfiguration().setClass(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, DefaultMobStoreFlusher.class, DefaultStoreFlusher.class); }
private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd) throws IOException { // FIFO compaction has some requirements // Actually FCP ignores periodic major compactions String className = htd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY); if (className == null) { className = conf.get(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, ExploringCompactionPolicy.class.getName()); } int blockingFileCount = HStore.DEFAULT_BLOCKING_STOREFILE_COUNT; String sv = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY); if (sv != null) { blockingFileCount = Integer.parseInt(sv); } else { blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, blockingFileCount); } for (HColumnDescriptor hcd : htd.getColumnFamilies()) { String compactionPolicy = hcd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY); if (compactionPolicy == null) { compactionPolicy = className; } if (!compactionPolicy.equals(FIFOCompactionPolicy.class.getName())) { continue; } // FIFOCompaction String message = null; // 1. Check TTL if (hcd.getTimeToLive() == HColumnDescriptor.DEFAULT_TTL) { message = "Default TTL is not supported for FIFO compaction"; throw new IOException(message); } // 2. Check min versions if (hcd.getMinVersions() > 0) { message = "MIN_VERSION > 0 is not supported for FIFO compaction"; throw new IOException(message); } // 3. blocking file count String sbfc = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY); if (sbfc != null) { blockingFileCount = Integer.parseInt(sbfc); } if (blockingFileCount < 1000) { message = "blocking file count '" + HStore.BLOCKING_STOREFILES_KEY + "' " + blockingFileCount + " is below recommended minimum of 1000"; throw new IOException(message); } } }
/** * Test the tuning task of {@link PressureAwareCompactionThroughputController} */ @Test public void testThroughputTuning() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); conf.setLong( PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, 20L * 1024 * 1024); conf.setLong( PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, 10L * 1024 * 1024); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, PressureAwareCompactionThroughputController.class.getName()); conf.setInt( PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, 1000); TEST_UTIL.startMiniCluster(1); Connection conn = ConnectionFactory.createConnection(conf); try { HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor(family)); htd.setCompactionEnabled(false); TEST_UTIL.getHBaseAdmin().createTable(htd); TEST_UTIL.waitTableAvailable(tableName); HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); PressureAwareCompactionThroughputController throughputController = (PressureAwareCompactionThroughputController) regionServer.compactSplitThread .getCompactionThroughputController(); assertEquals(10L * 1024 * 1024, throughputController.maxThroughput, EPSILON); Table table = conn.getTable(tableName); for (int i = 0; i < 5; i++) { table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0])); TEST_UTIL.flush(tableName); } Thread.sleep(2000); assertEquals(15L * 1024 * 1024, throughputController.maxThroughput, EPSILON); table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0])); TEST_UTIL.flush(tableName); Thread.sleep(2000); assertEquals(20L * 1024 * 1024, throughputController.maxThroughput, EPSILON); table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0])); TEST_UTIL.flush(tableName); Thread.sleep(2000); assertEquals(Double.MAX_VALUE, throughputController.maxThroughput, EPSILON); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, NoLimitCompactionThroughputController.class.getName()); regionServer.compactSplitThread.onConfigurationChange(conf); assertTrue(throughputController.isStopped()); assertTrue(regionServer.compactSplitThread.getCompactionThroughputController() instanceof NoLimitCompactionThroughputController); } finally { conn.close(); TEST_UTIL.shutdownMiniCluster(); } }
private void checkCompactionPolicy(Configuration conf, TableDescriptor htd) throws IOException { // FIFO compaction has some requirements // Actually FCP ignores periodic major compactions String className = htd.getValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY); if (className == null) { className = conf.get(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, ExploringCompactionPolicy.class.getName()); } int blockingFileCount = HStore.DEFAULT_BLOCKING_STOREFILE_COUNT; String sv = htd.getValue(HStore.BLOCKING_STOREFILES_KEY); if (sv != null) { blockingFileCount = Integer.parseInt(sv); } else { blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, blockingFileCount); } for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { String compactionPolicy = hcd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY); if (compactionPolicy == null) { compactionPolicy = className; } if (!compactionPolicy.equals(FIFOCompactionPolicy.class.getName())) { continue; } // FIFOCompaction String message = null; // 1. Check TTL if (hcd.getTimeToLive() == ColumnFamilyDescriptorBuilder.DEFAULT_TTL) { message = "Default TTL is not supported for FIFO compaction"; throw new IOException(message); } // 2. Check min versions if (hcd.getMinVersions() > 0) { message = "MIN_VERSION > 0 is not supported for FIFO compaction"; throw new IOException(message); } // 3. blocking file count sv = hcd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY); if (sv != null) { blockingFileCount = Integer.parseInt(sv); } if (blockingFileCount < 1000) { message = "Blocking file count '" + HStore.BLOCKING_STOREFILES_KEY + "' " + blockingFileCount + " is below recommended minimum of 1000 for column family "+ hcd.getNameAsString(); throw new IOException(message); } } }
/** * Test the tuning task of {@link PressureAwareFlushThroughputController} */ @Test public void testFlushThroughputTuning() throws Exception { Configuration conf = hbtu.getConfiguration(); setMaxMinThroughputs(20L * 1024 * 1024, 10L * 1024 * 1024); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); conf.setInt(PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD, 3000); hbtu.startMiniCluster(1); Connection conn = ConnectionFactory.createConnection(conf); hbtu.getAdmin().createTable(TableDescriptorBuilder.newBuilder(tableName) .addColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false) .build()); hbtu.waitTableAvailable(tableName); HRegionServer regionServer = hbtu.getRSForFirstRegionInTable(tableName); PressureAwareFlushThroughputController throughputController = (PressureAwareFlushThroughputController) regionServer.getFlushThroughputController(); for (HRegion region : regionServer.getRegions()) { region.flush(true); } assertEquals(0.0, regionServer.getFlushPressure(), EPSILON); Thread.sleep(5000); boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(hbtu.getConfiguration()); if (tablesOnMaster) { // If no tables on the master, this math is off and I'm not sure what it is supposed to be // when meta is on the regionserver and not on the master. assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); } Table table = conn.getTable(tableName); Random rand = new Random(); for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { byte[] value = new byte[256 * 1024]; rand.nextBytes(value); table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); } } Thread.sleep(5000); double expectedThroughPut = 10L * 1024 * 1024 * (1 + regionServer.getFlushPressure()); assertEquals(expectedThroughPut, throughputController.getMaxThroughput(), EPSILON); conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, NoLimitThroughputController.class.getName()); regionServer.onConfigurationChange(conf); assertTrue(throughputController.isStopped()); assertTrue(regionServer.getFlushThroughputController() instanceof NoLimitThroughputController); conn.close(); }