Java 类org.apache.hadoop.hbase.regionserver.Region.RowLock 实例源码

项目:hbase    文件:TestHRegion.java   
@Test
public void testAtomicBatchPut() throws IOException {
  final Put[] puts = new Put[10];
  MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
  try {
    long syncs = prepareRegionForBachPut(puts, source, false);

    // 1. Straight forward case, should succeed
    MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true,
        HConstants.NO_NONCE, HConstants.NO_NONCE);
    OperationStatus[] codes = this.region.batchMutate(batchOp);
    assertEquals(10, codes.length);
    for (int i = 0; i < 10; i++) {
      assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
    }
    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);

    // 2. Failed to get lock
    RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3));
    // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this
    // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread
    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
    final AtomicReference<IOException> retFromThread = new AtomicReference<>();
    final CountDownLatch finishedPuts = new CountDownLatch(1);
    final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true,
        HConstants
        .NO_NONCE,
        HConstants.NO_NONCE);
    TestThread putter = new TestThread(ctx) {
      @Override
      public void doWork() throws IOException {
        try {
          region.batchMutate(finalBatchOp);
        } catch (IOException ioe) {
          LOG.error("test failed!", ioe);
          retFromThread.set(ioe);
        }
        finishedPuts.countDown();
      }
    };
    LOG.info("...starting put thread while holding locks");
    ctx.addThread(putter);
    ctx.startThreads();
    LOG.info("...waiting for batch puts while holding locks");
    try {
      finishedPuts.await();
    } catch (InterruptedException e) {
      LOG.error("Interrupted!", e);
    } finally {
      if (lock != null) {
        lock.release();
      }
    }
    assertNotNull(retFromThread.get());
    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);

    // 3. Exception thrown in validation
    LOG.info("Next a batch put with one invalid family");
    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
    batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE,
        HConstants.NO_NONCE);
    thrown.expect(NoSuchColumnFamilyException.class);
    this.region.batchMutate(batchOp);
  } finally {
    HBaseTestingUtility.closeRegionAndWAL(this.region);
    this.region = null;
  }
}