public static void hammerSingleKey(final BlockCache toBeTested, int BlockSize, int numThreads, int numQueries) throws Exception { final BlockCacheKey key = new BlockCacheKey("key", 0); final byte[] buf = new byte[5 * 1024]; Arrays.fill(buf, (byte) 5); final ByteArrayCacheable bac = new ByteArrayCacheable(buf); Configuration conf = new Configuration(); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( conf); final AtomicInteger totalQueries = new AtomicInteger(); toBeTested.cacheBlock(key, bac); for (int i = 0; i < numThreads; i++) { TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { @Override public void doAnAction() throws Exception { ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested .getBlock(key, false, false); assertArrayEquals(buf, returned.buf); totalQueries.incrementAndGet(); } }; t.setDaemon(true); ctx.addThread(t); } ctx.startThreads(); while (totalQueries.get() < numQueries && ctx.shouldRun()) { Thread.sleep(10); } ctx.stop(); }
public static void hammerSingleKey(final BlockCache toBeTested, int BlockSize, int numThreads, int numQueries) throws Exception { final BlockCacheKey key = new BlockCacheKey("key", 0); final byte[] buf = new byte[5 * 1024]; Arrays.fill(buf, (byte) 5); final ByteArrayCacheable bac = new ByteArrayCacheable(buf); Configuration conf = new Configuration(); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( conf); final AtomicInteger totalQueries = new AtomicInteger(); toBeTested.cacheBlock(key, bac); for (int i = 0; i < numThreads; i++) { TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { @Override public void doAnAction() throws Exception { ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested .getBlock(key, false, false, true); assertArrayEquals(buf, returned.buf); totalQueries.incrementAndGet(); } }; t.setDaemon(true); ctx.addThread(t); } ctx.startThreads(); while (totalQueries.get() < numQueries && ctx.shouldRun()) { Thread.sleep(10); } ctx.stop(); }
public static void testCacheMultiThreaded(final BlockCache toBeTested, final int blockSize, final int numThreads, final int numQueries, final double passingScore) throws Exception { Configuration conf = new Configuration(); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( conf); final AtomicInteger totalQueries = new AtomicInteger(); final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<HFileBlockPair>(); final AtomicInteger hits = new AtomicInteger(); final AtomicInteger miss = new AtomicInteger(); HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize); blocksToTest.addAll(Arrays.asList(blocks)); for (int i = 0; i < numThreads; i++) { TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { @Override public void doAnAction() throws Exception { if (!blocksToTest.isEmpty()) { HFileBlockPair ourBlock = blocksToTest.poll(); // if we run out of blocks to test, then we should stop the tests. if (ourBlock == null) { ctx.setStopFlag(true); return; } toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block); Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName, false, false, true); if (retrievedBlock != null) { assertEquals(ourBlock.block, retrievedBlock); toBeTested.evictBlock(ourBlock.blockName); hits.incrementAndGet(); assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true)); } else { miss.incrementAndGet(); } totalQueries.incrementAndGet(); } } }; t.setDaemon(true); ctx.addThread(t); } ctx.startThreads(); while (!blocksToTest.isEmpty() && ctx.shouldRun()) { Thread.sleep(10); } ctx.stop(); if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) { fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " + miss.get()); } }
public static void hammerEviction(final BlockCache toBeTested, int BlockSize, int numThreads, int numQueries) throws Exception { Configuration conf = new Configuration(); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( conf); final AtomicInteger totalQueries = new AtomicInteger(); for (int i = 0; i < numThreads; i++) { final int finalI = i; final byte[] buf = new byte[5 * 1024]; TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { @Override public void doAnAction() throws Exception { for (int j = 0; j < 100; j++) { BlockCacheKey key = new BlockCacheKey("key_" + finalI + "_" + j, 0); Arrays.fill(buf, (byte) (finalI * j)); final ByteArrayCacheable bac = new ByteArrayCacheable(buf); ByteArrayCacheable gotBack = (ByteArrayCacheable) toBeTested .getBlock(key, true, false, true); if (gotBack != null) { assertArrayEquals(gotBack.buf, bac.buf); } else { toBeTested.cacheBlock(key, bac); } } totalQueries.incrementAndGet(); } }; t.setDaemon(true); ctx.addThread(t); } ctx.startThreads(); while (totalQueries.get() < numQueries && ctx.shouldRun()) { Thread.sleep(10); } ctx.stop(); assertTrue(toBeTested.getStats().getEvictedCount() > 0); }
public static void testCacheMultiThreaded(final BlockCache toBeTested, final int blockSize, final int numThreads, final int numQueries, final double passingScore) throws Exception { Configuration conf = new Configuration(); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( conf); final AtomicInteger totalQueries = new AtomicInteger(); final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<HFileBlockPair>(); final AtomicInteger hits = new AtomicInteger(); final AtomicInteger miss = new AtomicInteger(); HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize); blocksToTest.addAll(Arrays.asList(blocks)); for (int i = 0; i < numThreads; i++) { TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { @Override public void doAnAction() throws Exception { if (!blocksToTest.isEmpty()) { HFileBlockPair ourBlock = blocksToTest.poll(); // if we run out of blocks to test, then we should stop the tests. if (ourBlock == null) { ctx.setStopFlag(true); return; } toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block); Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName, false, false); if (retrievedBlock != null) { assertEquals(ourBlock.block, retrievedBlock); toBeTested.evictBlock(ourBlock.blockName); hits.incrementAndGet(); assertNull(toBeTested.getBlock(ourBlock.blockName, false, false)); } else { miss.incrementAndGet(); } totalQueries.incrementAndGet(); } } }; t.setDaemon(true); ctx.addThread(t); } ctx.startThreads(); while (!blocksToTest.isEmpty() && ctx.shouldRun()) { Thread.sleep(10); } ctx.stop(); if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) { fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " + miss.get()); } }
public static void hammerEviction(final BlockCache toBeTested, int BlockSize, int numThreads, int numQueries) throws Exception { Configuration conf = new Configuration(); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( conf); final AtomicInteger totalQueries = new AtomicInteger(); for (int i = 0; i < numThreads; i++) { final int finalI = i; final byte[] buf = new byte[5 * 1024]; TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { @Override public void doAnAction() throws Exception { for (int j = 0; j < 100; j++) { BlockCacheKey key = new BlockCacheKey("key_" + finalI + "_" + j, 0); Arrays.fill(buf, (byte) (finalI * j)); final ByteArrayCacheable bac = new ByteArrayCacheable(buf); ByteArrayCacheable gotBack = (ByteArrayCacheable) toBeTested .getBlock(key, true, false); if (gotBack != null) { assertArrayEquals(gotBack.buf, bac.buf); } else { toBeTested.cacheBlock(key, bac); } } totalQueries.incrementAndGet(); } }; t.setDaemon(true); ctx.addThread(t); } ctx.startThreads(); while (totalQueries.get() < numQueries && ctx.shouldRun()) { Thread.sleep(10); } ctx.stop(); assertTrue(toBeTested.getStats().getEvictedCount() > 0); }
@Test public void testBatchPut() throws Exception { byte[] b = Bytes.toBytes(getName()); byte[] cf = Bytes.toBytes(COLUMN_FAMILY); byte[] qual = Bytes.toBytes("qual"); byte[] val = Bytes.toBytes("val"); this.region = initHRegion(b, getName(), CONF, cf); MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); try { long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source); metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); LOG.info("First a batch put with all valid puts"); final Put[] puts = new Put[10]; for (int i = 0; i < 10; i++) { puts[i] = new Put(Bytes.toBytes("row_" + i)); puts[i].add(cf, qual, val); } OperationStatus[] codes = this.region.batchMutate(puts); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); LOG.info("Next a batch put with one invalid family"); puts[5].add(Bytes.toBytes("BAD_CF"), qual, val); codes = this.region.batchMutate(puts); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source); LOG.info("Next a batch put that has to break into two batches to avoid a lock"); RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2")); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF); final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<OperationStatus[]>(); TestThread putter = new TestThread(ctx) { @Override public void doWork() throws IOException { retFromThread.set(region.batchMutate(puts)); } }; LOG.info("...starting put thread while holding lock"); ctx.addThread(putter); ctx.startThreads(); LOG.info("...waiting for put thread to sync first time"); long startWait = System.currentTimeMillis(); while (metricsAssertHelper.getCounter("syncTimeNumOps", source) == syncs + 2) { Thread.sleep(100); if (System.currentTimeMillis() - startWait > 10000) { fail("Timed out waiting for thread to sync first minibatch"); } } LOG.info("...releasing row lock, which should let put thread continue"); rowLock.release(); LOG.info("...joining on thread"); ctx.stop(); LOG.info("...checking that next batch was synced"); metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 4, source); codes = retFromThread.get(); for (int i = 0; i < 10; i++) { assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } } finally { HRegion.closeHRegion(this.region); this.region = null; } }
@Test public void testAtomicBatchPut() throws IOException { final Put[] puts = new Put[10]; MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); try { long syncs = prepareRegionForBachPut(puts, source, false); // 1. Straight forward case, should succeed MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE, HConstants.NO_NONCE); OperationStatus[] codes = this.region.batchMutate(batchOp); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); // 2. Failed to get lock RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3)); // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF); final AtomicReference<IOException> retFromThread = new AtomicReference<>(); final CountDownLatch finishedPuts = new CountDownLatch(1); final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true, HConstants .NO_NONCE, HConstants.NO_NONCE); TestThread putter = new TestThread(ctx) { @Override public void doWork() throws IOException { try { region.batchMutate(finalBatchOp); } catch (IOException ioe) { LOG.error("test failed!", ioe); retFromThread.set(ioe); } finishedPuts.countDown(); } }; LOG.info("...starting put thread while holding locks"); ctx.addThread(putter); ctx.startThreads(); LOG.info("...waiting for batch puts while holding locks"); try { finishedPuts.await(); } catch (InterruptedException e) { LOG.error("Interrupted!", e); } finally { if (lock != null) { lock.release(); } } assertNotNull(retFromThread.get()); metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); // 3. Exception thrown in validation LOG.info("Next a batch put with one invalid family"); puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value); batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE, HConstants.NO_NONCE); thrown.expect(NoSuchColumnFamilyException.class); this.region.batchMutate(batchOp); } finally { HBaseTestingUtility.closeRegionAndWAL(this.region); this.region = null; } }
public static void testCacheMultiThreaded(final BlockCache toBeTested, final int blockSize, final int numThreads, final int numQueries, final double passingScore) throws Exception { Configuration conf = new Configuration(); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext( conf); final AtomicInteger totalQueries = new AtomicInteger(); final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<>(); final AtomicInteger hits = new AtomicInteger(); final AtomicInteger miss = new AtomicInteger(); HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize); blocksToTest.addAll(Arrays.asList(blocks)); for (int i = 0; i < numThreads; i++) { TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { @Override public void doAnAction() throws Exception { if (!blocksToTest.isEmpty()) { HFileBlockPair ourBlock = blocksToTest.poll(); // if we run out of blocks to test, then we should stop the tests. if (ourBlock == null) { ctx.setStopFlag(true); return; } toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block); Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName, false, false, true); if (retrievedBlock != null) { assertEquals(ourBlock.block, retrievedBlock); toBeTested.evictBlock(ourBlock.blockName); hits.incrementAndGet(); assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true)); } else { miss.incrementAndGet(); } totalQueries.incrementAndGet(); } } }; t.setDaemon(true); ctx.addThread(t); } ctx.startThreads(); while (!blocksToTest.isEmpty() && ctx.shouldRun()) { Thread.sleep(10); } ctx.stop(); if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) { fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " + miss.get()); } }
@Test public void testBatchPut() throws Exception { byte[] b = Bytes.toBytes(getName()); byte[] cf = Bytes.toBytes(COLUMN_FAMILY); byte[] qual = Bytes.toBytes("qual"); byte[] val = Bytes.toBytes("val"); this.region = initHRegion(b, getName(), conf, cf); MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); try { long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source); metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); LOG.info("First a batch put with all valid puts"); final Put[] puts = new Put[10]; for (int i = 0; i < 10; i++) { puts[i] = new Put(Bytes.toBytes("row_" + i)); puts[i].add(cf, qual, val); } OperationStatus[] codes = this.region.batchMutate(puts); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); LOG.info("Next a batch put with one invalid family"); puts[5].add(Bytes.toBytes("BAD_CF"), qual, val); codes = this.region.batchMutate(puts); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source); LOG.info("Next a batch put that has to break into two batches to avoid a lock"); RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2")); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<OperationStatus[]>(); TestThread putter = new TestThread(ctx) { @Override public void doWork() throws IOException { retFromThread.set(region.batchMutate(puts)); } }; LOG.info("...starting put thread while holding lock"); ctx.addThread(putter); ctx.startThreads(); LOG.info("...waiting for put thread to sync first time"); long startWait = System.currentTimeMillis(); while (metricsAssertHelper.getCounter("syncTimeNumOps", source) == syncs + 2) { Thread.sleep(100); if (System.currentTimeMillis() - startWait > 10000) { fail("Timed out waiting for thread to sync first minibatch"); } } LOG.info("...releasing row lock, which should let put thread continue"); rowLock.release(); LOG.info("...joining on thread"); ctx.stop(); LOG.info("...checking that next batch was synced"); metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 4, source); codes = retFromThread.get(); for (int i = 0; i < 10; i++) { assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } } finally { HRegion.closeHRegion(this.region); this.region = null; } }