/** * Locate destination region based on table name & row. This function also makes sure the * destination region is online for replay. * @throws IOException */ private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn, TableName table, byte[] row, String originalEncodedRegionName) throws IOException { // fetch location from cache HRegionLocation loc = onlineRegions.get(originalEncodedRegionName); if(loc != null) return loc; // fetch location from hbase:meta directly without using cache to avoid hit old dead server loc = hconn.getRegionLocation(table, row, true); if (loc == null) { throw new IOException("Can't locate location for row:" + Bytes.toString(row) + " of table:" + table); } // check if current row moves to a different region due to region merge/split if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) { // originalEncodedRegionName should have already flushed lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE); HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName()); if (tmpLoc != null) return tmpLoc; } Long lastFlushedSequenceId = -1l; AtomicBoolean isRecovering = new AtomicBoolean(true); loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering); if (!isRecovering.get()) { // region isn't in recovering at all because WAL file may contain a region that has // been moved to somewhere before hosting RS fails lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE); LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName() + " because it's not in recovering."); } else { Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName()); // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will // update the value for the region RegionStoreSequenceIds ids = csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, loc.getRegionInfo().getEncodedName()); if (ids != null) { lastFlushedSequenceId = ids.getLastFlushedSequenceId(); Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList(); for (StoreSequenceId id : maxSeqIdInStores) { storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId()); } regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds); } if (cachedLastFlushedSequenceId == null || lastFlushedSequenceId > cachedLastFlushedSequenceId) { lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId); } } onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc); return loc; }
/** * Locate destination region based on table name & row. This function also makes sure the * destination region is online for replay. * @throws IOException */ private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn, TableName table, byte[] row, String originalEncodedRegionName) throws IOException { // fetch location from cache HRegionLocation loc = onlineRegions.get(originalEncodedRegionName); if(loc != null) return loc; // fetch location from hbase:meta directly without using cache to avoid hit old dead server loc = hconn.getRegionLocation(table, row, true); if (loc == null) { throw new IOException("Can't locate location for row:" + Bytes.toString(row) + " of table:" + table); } // check if current row moves to a different region due to region merge/split if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) { // originalEncodedRegionName should have already flushed lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE); HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName()); if (tmpLoc != null) return tmpLoc; } Long lastFlushedSequenceId = -1l; AtomicBoolean isRecovering = new AtomicBoolean(true); loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering); if (!isRecovering.get()) { // region isn't in recovering at all because WAL file may contain a region that has // been moved to somewhere before hosting RS fails lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE); LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName() + " because it's not in recovering."); } else { Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName()); // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will // update the value for the region RegionStoreSequenceIds ids = SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc .getRegionInfo().getEncodedName()); if (ids != null) { lastFlushedSequenceId = ids.getLastFlushedSequenceId(); Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList(); for (StoreSequenceId id : maxSeqIdInStores) { storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId()); } regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds); } if (cachedLastFlushedSequenceId == null || lastFlushedSequenceId > cachedLastFlushedSequenceId) { lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId); } } onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc); return loc; }