/** * @param op The operation is about to be taken on the region * @throws NotServingRegionException * @throws RegionTooBusyException * @throws InterruptedIOException */ protected void startRegionOperation(Operation op) throws NotServingRegionException, RegionTooBusyException, InterruptedIOException { switch (op) { case INCREMENT: case APPEND: case GET: case SCAN: case SPLIT_REGION: case MERGE_REGION: case PUT: case DELETE: case BATCH_MUTATE: case COMPACT_REGION: // when a region is in recovering state, no read, split or merge is allowed if (this.isRecovering() && (this.disallowWritesInRecovering || (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) { throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering"); } break; default: break; } if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION || op == Operation.COMPACT_REGION) { // split, merge or compact region doesn't need to check the closing/closed state or lock the // region return; } if (this.closing.get()) { throw new NotServingRegionException(getRegionNameAsString() + " is closing"); } lock(lock.readLock()); if (this.closed.get()) { lock.readLock().unlock(); throw new NotServingRegionException(getRegionNameAsString() + " is closed"); } }
@Override @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "SF_SWITCH_FALLTHROUGH", justification = "Intentional") public void startRegionOperation(Operation op) throws IOException { switch (op) { case GET: // read operations case SCAN: checkReadsEnabled(); case INCREMENT: // write operations case APPEND: case SPLIT_REGION: case MERGE_REGION: case PUT: case DELETE: case BATCH_MUTATE: case COMPACT_REGION: // when a region is in recovering state, no read, split or merge is // allowed if (isRecovering() && (this.disallowWritesInRecovering || (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) { throw new RegionInRecoveryException( getRegionInfo().getRegionNameAsString() + " is recovering; cannot take reads"); } break; default: break; } if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION || op == Operation.COMPACT_REGION) { // split, merge or compact region doesn't need to check the closing/closed // state or lock the // region return; } if (this.closing.get()) { throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing"); } lock(lock.readLock()); if (this.closed.get()) { lock.readLock().unlock(); throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed"); } try { if (coprocessorHost != null) { coprocessorHost.postStartRegionOperation(op); } } catch (Exception e) { lock.readLock().unlock(); throw new IOException(e); } }
/** * @param op The operation is about to be taken on the region * @throws IOException */ protected void startRegionOperation(Operation op) throws IOException { switch (op) { case GET: // read operations case SCAN: checkReadsEnabled(); case INCREMENT: // write operations case APPEND: case SPLIT_REGION: case MERGE_REGION: case PUT: case DELETE: case BATCH_MUTATE: case COMPACT_REGION: // when a region is in recovering state, no read, split or merge is allowed if (isRecovering() && (this.disallowWritesInRecovering || (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) { throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering; cannot take reads"); } break; default: break; } if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION || op == Operation.COMPACT_REGION) { // split, merge or compact region doesn't need to check the closing/closed state or lock the // region return; } if (this.closing.get()) { throw new NotServingRegionException(getRegionNameAsString() + " is closing"); } lock(lock.readLock()); if (this.closed.get()) { lock.readLock().unlock(); throw new NotServingRegionException(getRegionNameAsString() + " is closed"); } try { if (coprocessorHost != null) { coprocessorHost.postStartRegionOperation(op); } } catch (Exception e) { lock.readLock().unlock(); throw new IOException(e); } }
@Test(timeout = 300000) public void testDisallowWritesInRecovering() throws Exception { LOG.info("testDisallowWritesInRecovering"); conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true); startCluster(NUM_RS); final int NUM_REGIONS_TO_CREATE = 40; // turn off load balancing to prevent regions from moving around otherwise // they will consume recovered.edits master.balanceSwitch(false); List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>(); HRegionInfo region = null; HRegionServer hrs = null; HRegionServer dstRS = null; for (int i = 0; i < NUM_RS; i++) { hrs = rsts.get(i).getRegionServer(); List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); if (regions.isEmpty()) continue; region = regions.get(0); if (region.isMetaRegion()) continue; regionSet.add(region); dstRS = rsts.get((i+1) % NUM_RS).getRegionServer(); break; } slm.markRegionsRecovering(hrs.getServerName(), regionSet); // move region in order for the region opened in recovering state final HRegionInfo hri = region; final HRegionServer tmpRS = dstRS; TEST_UTIL.getHBaseAdmin().move(region.getEncodedNameAsBytes(), Bytes.toBytes(dstRS.getServerName().getServerName())); // wait for region move completes final RegionStates regionStates = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { ServerName sn = regionStates.getRegionServerOfRegion(hri); return (sn != null && sn.equals(tmpRS.getServerName())); } }); try { byte[] key = region.getStartKey(); if (key == null || key.length == 0) { key = new byte[] { 0, 0, 0, 0, 1 }; } Put put = new Put(key); put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'}); ht.put(put); } catch (IOException ioe) { Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException); RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe; boolean foundRegionInRecoveryException = false; for (Throwable t : re.getCauses()) { if (t instanceof RegionInRecoveryException) { foundRegionInRecoveryException = true; break; } } Assert.assertTrue( "No RegionInRecoveryException. Following exceptions returned=" + re.getCauses(), foundRegionInRecoveryException); } ht.close(); zkw.close(); }
/** * @param op The operation is about to be taken on the region * @throws IOException */ public void startRegionOperation(Operation op) throws IOException { switch (op) { case INCREMENT: case APPEND: case GET: case SCAN: case SPLIT_REGION: case MERGE_REGION: case PUT: case DELETE: case BATCH_MUTATE: case COMPACT_REGION: // when a region is in recovering state, no read, split or merge is allowed if (this.isRecovering() && (this.disallowWritesInRecovering || (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) { throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering"); } break; default: break; } if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION || op == Operation.COMPACT_REGION) { // split, merge or compact region doesn't need to check the closing/closed state or lock the // region return; } if (this.closing.get()) { throw new NotServingRegionException(getRegionNameAsString() + " is closing"); } lock(lock.readLock()); if (this.closed.get()) { lock.readLock().unlock(); throw new NotServingRegionException(getRegionNameAsString() + " is closed"); } try { if (coprocessorHost != null) { coprocessorHost.postStartRegionOperation(op); } } catch (Exception e) { lock.readLock().unlock(); throw new IOException(e); } }
@Test(timeout = 300000) public void testDisallowWritesInRecovering() throws Exception { LOG.info("testDisallowWritesInRecovering"); conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true); startCluster(NUM_RS); final int NUM_REGIONS_TO_CREATE = 40; // turn off load balancing to prevent regions from moving around otherwise // they will consume recovered.edits master.balanceSwitch(false); List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>(); HRegionInfo region = null; HRegionServer hrs = null; HRegionServer dstRS = null; for (int i = 0; i < NUM_RS; i++) { hrs = rsts.get(i).getRegionServer(); List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs); if (regions.isEmpty()) continue; region = regions.get(0); regionSet.add(region); dstRS = rsts.get((i+1) % NUM_RS).getRegionServer(); break; } slm.markRegionsRecoveringInZK(hrs.getServerName(), regionSet); // move region in order for the region opened in recovering state final HRegionInfo hri = region; final HRegionServer tmpRS = dstRS; TEST_UTIL.getHBaseAdmin().move(region.getEncodedNameAsBytes(), Bytes.toBytes(dstRS.getServerName().getServerName())); // wait for region move completes final RegionStates regionStates = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { ServerName sn = regionStates.getRegionServerOfRegion(hri); return (sn != null && sn.equals(tmpRS.getServerName())); } }); try { byte[] key = region.getStartKey(); if (key == null || key.length == 0) { key = new byte[] { 0, 0, 0, 0, 1 }; } ht.setAutoFlush(true, true); Put put = new Put(key); put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'}); ht.put(put); ht.close(); } catch (IOException ioe) { Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException); RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe; boolean foundRegionInRecoveryException = false; for (Throwable t : re.getCauses()) { if (t instanceof RegionInRecoveryException) { foundRegionInRecoveryException = true; break; } } Assert.assertTrue( "No RegionInRecoveryException. Following exceptions returned=" + re.getCauses(), foundRegionInRecoveryException); } zkw.close(); }
/** * @param op The operation is about to be taken on the region * @throws IOException */ protected void startRegionOperation(Operation op) throws IOException { switch (op) { case INCREMENT: case APPEND: case GET: case SCAN: case SPLIT_REGION: case MERGE_REGION: case PUT: case DELETE: case BATCH_MUTATE: case COMPACT_REGION: // when a region is in recovering state, no read, split or merge is allowed if (this.isRecovering() && (this.disallowWritesInRecovering || (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) { throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering"); } break; default: break; } if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION || op == Operation.COMPACT_REGION) { // split, merge or compact region doesn't need to check the closing/closed state or lock the // region return; } if (this.closing.get()) { throw new NotServingRegionException(getRegionNameAsString() + " is closing"); } lock(lock.readLock()); if (this.closed.get()) { lock.readLock().unlock(); throw new NotServingRegionException(getRegionNameAsString() + " is closed"); } try { if (coprocessorHost != null) { coprocessorHost.postStartRegionOperation(op); } } catch (Exception e) { lock.readLock().unlock(); throw new IOException(e); } }
@Test(timeout = 300000) public void testDisallowWritesInRecovering() throws Exception { LOG.info("testDisallowWritesInRecovering"); conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true); startCluster(NUM_RS); final int NUM_REGIONS_TO_CREATE = 40; // turn off load balancing to prevent regions from moving around otherwise // they will consume recovered.edits master.balanceSwitch(false); List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>(); HRegionInfo region = null; HRegionServer hrs = null; HRegionServer dstRS = null; for (int i = 0; i < NUM_RS; i++) { hrs = rsts.get(i).getRegionServer(); List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); if (regions.isEmpty()) continue; region = regions.get(0); regionSet.add(region); dstRS = rsts.get((i+1) % NUM_RS).getRegionServer(); break; } slm.markRegionsRecoveringInZK(hrs.getServerName(), regionSet); // move region in order for the region opened in recovering state final HRegionInfo hri = region; final HRegionServer tmpRS = dstRS; TEST_UTIL.getHBaseAdmin().move(region.getEncodedNameAsBytes(), Bytes.toBytes(dstRS.getServerName().getServerName())); // wait for region move completes final RegionStates regionStates = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { ServerName sn = regionStates.getRegionServerOfRegion(hri); return (sn != null && sn.equals(tmpRS.getServerName())); } }); try { byte[] key = region.getStartKey(); if (key == null || key.length == 0) { key = new byte[] { 0, 0, 0, 0, 1 }; } ht.setAutoFlush(true, true); Put put = new Put(key); put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'}); ht.put(put); } catch (IOException ioe) { Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException); RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe; boolean foundRegionInRecoveryException = false; for (Throwable t : re.getCauses()) { if (t instanceof RegionInRecoveryException) { foundRegionInRecoveryException = true; break; } } Assert.assertTrue( "No RegionInRecoveryException. Following exceptions returned=" + re.getCauses(), foundRegionInRecoveryException); } ht.close(); zkw.close(); }
@Test(timeout = 300000) public void testDisallowWritesInRecovering() throws Exception { LOG.info("testDisallowWritesInRecovering"); conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true); startCluster(NUM_RS); final int NUM_REGIONS_TO_CREATE = 40; // turn off load balancing to prevent regions from moving around otherwise // they will consume recovered.edits master.balanceSwitch(false); List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>(); HRegionInfo region = null; HRegionServer hrs = null; HRegionServer dstRS = null; for (int i = 0; i < NUM_RS; i++) { hrs = rsts.get(i).getRegionServer(); List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs); if (regions.isEmpty()) continue; region = regions.get(0); regionSet.add(region); dstRS = rsts.get((i+1) % NUM_RS).getRegionServer(); break; } slm.markRegionsRecoveringInZK(hrs.getServerName(), regionSet); // move region in order for the region opened in recovering state final HRegionInfo hri = region; final HRegionServer tmpRS = dstRS; TEST_UTIL.getHBaseAdmin().move(region.getEncodedNameAsBytes(), Bytes.toBytes(dstRS.getServerName().getServerName())); // wait for region move completes final RegionStates regionStates = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { ServerName sn = regionStates.getRegionServerOfRegion(hri); return (sn != null && sn.equals(tmpRS.getServerName())); } }); try { byte[] key = region.getStartKey(); if (key == null || key.length == 0) { key = new byte[] { 0, 0, 0, 0, 1 }; } ht.setAutoFlush(true, true); Put put = new Put(key); put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'}); ht.put(put); ht.close(); } catch (IOException ioe) { Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException); RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe; boolean foundRegionInRecoveryException = false; for (Throwable t : re.getCauses()) { if (t instanceof RegionInRecoveryException) { foundRegionInRecoveryException = true; break; } } Assert.assertTrue( "No RegionInRecoveryException. Following exceptions returned=" + re.getCauses(), foundRegionInRecoveryException); } zkw.close(); }