@Test public void testAtomicBatchPut() throws IOException { final Put[] puts = new Put[10]; MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); try { long syncs = prepareRegionForBachPut(puts, source, false); // 1. Straight forward case, should succeed MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE, HConstants.NO_NONCE); OperationStatus[] codes = this.region.batchMutate(batchOp); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); // 2. Failed to get lock RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3)); // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF); final AtomicReference<IOException> retFromThread = new AtomicReference<>(); final CountDownLatch finishedPuts = new CountDownLatch(1); final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true, HConstants .NO_NONCE, HConstants.NO_NONCE); TestThread putter = new TestThread(ctx) { @Override public void doWork() throws IOException { try { region.batchMutate(finalBatchOp); } catch (IOException ioe) { LOG.error("test failed!", ioe); retFromThread.set(ioe); } finishedPuts.countDown(); } }; LOG.info("...starting put thread while holding locks"); ctx.addThread(putter); ctx.startThreads(); LOG.info("...waiting for batch puts while holding locks"); try { finishedPuts.await(); } catch (InterruptedException e) { LOG.error("Interrupted!", e); } finally { if (lock != null) { lock.release(); } } assertNotNull(retFromThread.get()); metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); // 3. Exception thrown in validation LOG.info("Next a batch put with one invalid family"); puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value); batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE, HConstants.NO_NONCE); thrown.expect(NoSuchColumnFamilyException.class); this.region.batchMutate(batchOp); } finally { HBaseTestingUtility.closeRegionAndWAL(this.region); this.region = null; } }