private void runOneTest(String description, Configuration conf) throws Exception { int numServers = util.getHBaseClusterInterface().getClusterStatus().getServersSize(); long startKey = (long)preloadKeys * numServers; long endKey = startKey + (long)writeKeys * numServers; status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d", description, numServers, startKey, endKey)); if (preloadKeys > 0) { MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); long time = System.currentTimeMillis(); preloader.start(0, startKey, writeThreads); preloader.waitForFinish(); if (preloader.getNumWriteFailures() > 0) { throw new IOException("Preload failed"); } int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary status(description + " preload took " + (System.currentTimeMillis()-time)/1000 + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize"); Thread.sleep(waitTime); } MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100); // reader.getMetrics().enable(); reader.linkToWriter(writer); long testStartTime = System.currentTimeMillis(); writer.start(startKey, endKey, writeThreads); reader.start(startKey, endKey, readThreads); writer.waitForFinish(); reader.waitForFinish(); // reader.waitForVerification(300000); // reader.abortAndWaitForFinish(); status("Readers and writers stopped for test " + description); boolean success = writer.getNumWriteFailures() == 0; if (!success) { LOG.error("Write failed"); } else { success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0; if (!success) { LOG.error("Read failed"); } } // Dump perf regardless of the result. /*StringBuilder perfDump = new StringBuilder(); for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) { perfDump.append(String.format( "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond())); } if (dumpTimePerf) { Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries(); while (timePerf.hasNext()) { Triple<Long, Double, Long> pt = timePerf.next(); perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n", description, pt.getFirst(), pt.getThird(), pt.getSecond())); } } LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/ status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec"); Assert.assertTrue(success); }
@Test public void testReadersAndWriters() throws Exception { Configuration conf = util.getConfiguration(); String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName()); long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES); long serverCount = util.getHBaseClusterInterface().getClusterStatus().getServersSize(); long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER; Table table = new HTable(conf, TABLE_NAME); // Create multi-threaded writer and start it. We write multiple columns/CFs and verify // their integrity, therefore multi-put is necessary. MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); writer.setMultiPut(true); LOG.info("Starting writer; the number of keys to write is " + keysToWrite); // TODO : Need to see if tag support has to be given here in the integration test suite writer.start(1, keysToWrite, WRITER_THREADS); // Now, do scans. long now = EnvironmentEdgeManager.currentTime(); long timeLimit = now + (maxRuntime * 60000); boolean isWriterDone = false; while (now < timeLimit && !isWriterDone) { LOG.info("Starting the scan; wrote approximately " + dataGen.getTotalNumberOfKeys() + " keys"); isWriterDone = writer.isDone(); if (isWriterDone) { LOG.info("Scanning full result, writer is done"); } Scan scan = new Scan(); for (byte[] cf : dataGen.getColumnFamilies()) { scan.addFamily(cf); } scan.setFilter(dataGen.getScanFilter()); scan.setLoadColumnFamiliesOnDemand(true); // The number of keys we can expect from scan - lower bound (before scan). // Not a strict lower bound - writer knows nothing about filters, so we report // this from generator. Writer might have generated the value but not put it yet. long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys(); long startTs = EnvironmentEdgeManager.currentTime(); ResultScanner results = table.getScanner(scan); long resultCount = 0; Result result = null; // Verify and count the results. while ((result = results.next()) != null) { boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true); Assert.assertTrue("Failed to verify [" + Bytes.toString(result.getRow())+ "]", isOk); ++resultCount; } long timeTaken = EnvironmentEdgeManager.currentTime() - startTs; // Verify the result count. long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys(); Assert.assertTrue("Read " + resultCount + " keys when at most " + onesGennedAfterScan + " were generated ", onesGennedAfterScan >= resultCount); if (isWriterDone) { Assert.assertTrue("Read " + resultCount + " keys; the writer is done and " + onesGennedAfterScan + " keys were generated", onesGennedAfterScan == resultCount); } else if (onesGennedBeforeScan * 0.9 > resultCount) { LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan + ") - there might be a problem, or the writer might just be slow"); } LOG.info("Scan took " + timeTaken + "ms"); if (!isWriterDone) { Thread.sleep(WAIT_BETWEEN_SCANS_MS); now = EnvironmentEdgeManager.currentTime(); } } Assert.assertEquals("There are write failures", 0, writer.getNumWriteFailures()); Assert.assertTrue("Writer is not done", isWriterDone); // Assert.fail("Boom!"); }
private void runOneTest(String description, Configuration conf) throws Exception { int numServers = util.getHBaseClusterInterface().getClusterStatus().getServersSize(); long startKey = (long)preloadKeys * numServers; long endKey = startKey + (long)writeKeys * numServers; status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d", description, numServers, startKey, endKey)); TableName tn = TableName.valueOf(TABLE_NAME); if (preloadKeys > 0) { MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, tn); long time = System.currentTimeMillis(); preloader.start(0, startKey, writeThreads); preloader.waitForFinish(); if (preloader.getNumWriteFailures() > 0) { throw new IOException("Preload failed"); } int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary status(description + " preload took " + (System.currentTimeMillis()-time)/1000 + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize"); Thread.sleep(waitTime); } MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tn); MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tn, 100); // reader.getMetrics().enable(); reader.linkToWriter(writer); long testStartTime = System.currentTimeMillis(); writer.start(startKey, endKey, writeThreads); reader.start(startKey, endKey, readThreads); writer.waitForFinish(); reader.waitForFinish(); // reader.waitForVerification(300000); // reader.abortAndWaitForFinish(); status("Readers and writers stopped for test " + description); boolean success = writer.getNumWriteFailures() == 0; if (!success) { LOG.error("Write failed"); } else { success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0; if (!success) { LOG.error("Read failed"); } } // Dump perf regardless of the result. /*StringBuilder perfDump = new StringBuilder(); for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) { perfDump.append(String.format( "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond())); } if (dumpTimePerf) { Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries(); while (timePerf.hasNext()) { Triple<Long, Double, Long> pt = timePerf.next(); perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n", description, pt.getFirst(), pt.getThird(), pt.getSecond())); } } LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/ status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec"); Assert.assertTrue(success); }
@Test public void testReadersAndWriters() throws Exception { Configuration conf = util.getConfiguration(); String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName()); long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES); long serverCount = util.getHBaseClusterInterface().getClusterStatus().getServersSize(); long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER; HTable table = new HTable(conf, TABLE_NAME); // Create multi-threaded writer and start it. We write multiple columns/CFs and verify // their integrity, therefore multi-put is necessary. MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); writer.setMultiPut(true); LOG.info("Starting writer; the number of keys to write is " + keysToWrite); // TODO : Need to see if tag support has to be given here in the integration test suite writer.start(1, keysToWrite, WRITER_THREADS); // Now, do scans. long now = EnvironmentEdgeManager.currentTimeMillis(); long timeLimit = now + (maxRuntime * 60000); boolean isWriterDone = false; while (now < timeLimit && !isWriterDone) { LOG.info("Starting the scan; wrote approximately " + dataGen.getTotalNumberOfKeys() + " keys"); isWriterDone = writer.isDone(); if (isWriterDone) { LOG.info("Scanning full result, writer is done"); } Scan scan = new Scan(); for (byte[] cf : dataGen.getColumnFamilies()) { scan.addFamily(cf); } scan.setFilter(dataGen.getScanFilter()); scan.setLoadColumnFamiliesOnDemand(true); // The number of keys we can expect from scan - lower bound (before scan). // Not a strict lower bound - writer knows nothing about filters, so we report // this from generator. Writer might have generated the value but not put it yet. long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys(); long startTs = EnvironmentEdgeManager.currentTimeMillis(); ResultScanner results = table.getScanner(scan); long resultCount = 0; Result result = null; // Verify and count the results. while ((result = results.next()) != null) { boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true); Assert.assertTrue("Failed to verify [" + Bytes.toString(result.getRow())+ "]", isOk); ++resultCount; } long timeTaken = EnvironmentEdgeManager.currentTimeMillis() - startTs; // Verify the result count. long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys(); Assert.assertTrue("Read " + resultCount + " keys when at most " + onesGennedAfterScan + " were generated ", onesGennedAfterScan >= resultCount); if (isWriterDone) { Assert.assertTrue("Read " + resultCount + " keys; the writer is done and " + onesGennedAfterScan + " keys were generated", onesGennedAfterScan == resultCount); } else if (onesGennedBeforeScan * 0.9 > resultCount) { LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan + ") - there might be a problem, or the writer might just be slow"); } LOG.info("Scan took " + timeTaken + "ms"); if (!isWriterDone) { Thread.sleep(WAIT_BETWEEN_SCANS_MS); now = EnvironmentEdgeManager.currentTimeMillis(); } } Assert.assertEquals("There are write failures", 0, writer.getNumWriteFailures()); Assert.assertTrue("Writer is not done", isWriterDone); // Assert.fail("Boom!"); }
private void runOneTest(String description, Configuration conf) throws Exception { int numServers = util.getHBaseClusterInterface() .getClusterMetrics().getLiveServerMetrics().size(); long startKey = preloadKeys * numServers; long endKey = startKey + writeKeys * numServers; status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d", description, numServers, startKey, endKey)); if (preloadKeys > 0) { MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); long time = System.currentTimeMillis(); preloader.start(0, startKey, writeThreads); preloader.waitForFinish(); if (preloader.getNumWriteFailures() > 0) { throw new IOException("Preload failed"); } int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary status(description + " preload took " + (System.currentTimeMillis()-time)/1000 + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize"); Thread.sleep(waitTime); } MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100); // reader.getMetrics().enable(); reader.linkToWriter(writer); long testStartTime = System.currentTimeMillis(); writer.start(startKey, endKey, writeThreads); reader.start(startKey, endKey, readThreads); writer.waitForFinish(); reader.waitForFinish(); // reader.waitForVerification(300000); // reader.abortAndWaitForFinish(); status("Readers and writers stopped for test " + description); boolean success = writer.getNumWriteFailures() == 0; if (!success) { LOG.error("Write failed"); } else { success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0; if (!success) { LOG.error("Read failed"); } } // Dump perf regardless of the result. /*StringBuilder perfDump = new StringBuilder(); for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) { perfDump.append(String.format( "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond())); } if (dumpTimePerf) { Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries(); while (timePerf.hasNext()) { Triple<Long, Double, Long> pt = timePerf.next(); perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n", description, pt.getFirst(), pt.getThird(), pt.getSecond())); } } LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/ status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec"); Assert.assertTrue(success); }
@Test public void testReadersAndWriters() throws Exception { Configuration conf = util.getConfiguration(); String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName()); long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES); long serverCount = util.getHBaseClusterInterface().getClusterMetrics() .getLiveServerMetrics().size(); long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER; Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TABLE_NAME); // Create multi-threaded writer and start it. We write multiple columns/CFs and verify // their integrity, therefore multi-put is necessary. MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); writer.setMultiPut(true); LOG.info("Starting writer; the number of keys to write is " + keysToWrite); // TODO : Need to see if tag support has to be given here in the integration test suite writer.start(1, keysToWrite, WRITER_THREADS); // Now, do scans. long now = EnvironmentEdgeManager.currentTime(); long timeLimit = now + (maxRuntime * 60000); boolean isWriterDone = false; while (now < timeLimit && !isWriterDone) { LOG.info("Starting the scan; wrote approximately " + dataGen.getTotalNumberOfKeys() + " keys"); isWriterDone = writer.isDone(); if (isWriterDone) { LOG.info("Scanning full result, writer is done"); } Scan scan = new Scan(); for (byte[] cf : dataGen.getColumnFamilies()) { scan.addFamily(cf); } scan.setFilter(dataGen.getScanFilter()); scan.setLoadColumnFamiliesOnDemand(true); // The number of keys we can expect from scan - lower bound (before scan). // Not a strict lower bound - writer knows nothing about filters, so we report // this from generator. Writer might have generated the value but not put it yet. long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys(); long startTs = EnvironmentEdgeManager.currentTime(); ResultScanner results = table.getScanner(scan); long resultCount = 0; Result result = null; // Verify and count the results. while ((result = results.next()) != null) { boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true); Assert.assertTrue("Failed to verify [" + Bytes.toString(result.getRow())+ "]", isOk); ++resultCount; } long timeTaken = EnvironmentEdgeManager.currentTime() - startTs; // Verify the result count. long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys(); Assert.assertTrue("Read " + resultCount + " keys when at most " + onesGennedAfterScan + " were generated ", onesGennedAfterScan >= resultCount); if (isWriterDone) { Assert.assertTrue("Read " + resultCount + " keys; the writer is done and " + onesGennedAfterScan + " keys were generated", onesGennedAfterScan == resultCount); } else if (onesGennedBeforeScan * 0.9 > resultCount) { LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan + ") - there might be a problem, or the writer might just be slow"); } LOG.info("Scan took " + timeTaken + "ms"); if (!isWriterDone) { Thread.sleep(WAIT_BETWEEN_SCANS_MS); now = EnvironmentEdgeManager.currentTime(); } } Assert.assertEquals("There are write failures", 0, writer.getNumWriteFailures()); Assert.assertTrue("Writer is not done", isWriterDone); // Assert.fail("Boom!"); connection.close(); }
@Test public void testReadersAndWriters() throws Exception { Configuration conf = util.getConfiguration(); String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName()); long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES); long serverCount = util.getHBaseClusterInterface().getClusterStatus().getServersSize(); long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER; HTable table = new HTable(conf, Bytes.toBytes(TABLE_NAME)); // Create multi-threaded writer and start it. We write multiple columns/CFs and verify // their integrity, therefore multi-put is necessary. MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, Bytes.toBytes(TABLE_NAME)); writer.setMultiPut(true); LOG.info("Starting writer; the number of keys to write is " + keysToWrite); writer.start(1, keysToWrite, WRITER_THREADS); // Now, do scans. long now = EnvironmentEdgeManager.currentTimeMillis(); long timeLimit = now + (maxRuntime * 60000); boolean isWriterDone = false; while (now < timeLimit && !isWriterDone) { LOG.info("Starting the scan; wrote approximately " + dataGen.getTotalNumberOfKeys() + " keys"); isWriterDone = writer.isDone(); if (isWriterDone) { LOG.info("Scanning full result, writer is done"); } Scan scan = new Scan(); for (byte[] cf : dataGen.getColumnFamilies()) { scan.addFamily(cf); } scan.setFilter(dataGen.getScanFilter()); scan.setLoadColumnFamiliesOnDemand(true); // The number of keys we can expect from scan - lower bound (before scan). // Not a strict lower bound - writer knows nothing about filters, so we report // this from generator. Writer might have generated the value but not put it yet. long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys(); long startTs = EnvironmentEdgeManager.currentTimeMillis(); ResultScanner results = table.getScanner(scan); long resultCount = 0; Result result = null; // Verify and count the results. while ((result = results.next()) != null) { boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true); Assert.assertTrue("Failed to verify [" + Bytes.toString(result.getRow())+ "]", isOk); ++resultCount; } long timeTaken = EnvironmentEdgeManager.currentTimeMillis() - startTs; // Verify the result count. long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys(); Assert.assertTrue("Read " + resultCount + " keys when at most " + onesGennedAfterScan + " were generated ", onesGennedAfterScan >= resultCount); if (isWriterDone) { Assert.assertTrue("Read " + resultCount + " keys; the writer is done and " + onesGennedAfterScan + " keys were generated", onesGennedAfterScan == resultCount); } else if (onesGennedBeforeScan * 0.9 > resultCount) { LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan + ") - there might be a problem, or the writer might just be slow"); } LOG.info("Scan took " + timeTaken + "ms"); if (!isWriterDone) { Thread.sleep(WAIT_BETWEEN_SCANS_MS); now = EnvironmentEdgeManager.currentTimeMillis(); } } Assert.assertEquals("There are write failures", 0, writer.getNumWriteFailures()); Assert.assertTrue("Writer is not done", isWriterDone); // Assert.fail("Boom!"); }