/** * @param regionLastFlushedSequenceId the flushed sequence id of a region which is the min of its * store max seq ids * @param storeSequenceIds column family to sequence Id map * @return Serialized protobuf of <code>RegionSequenceIds</code> with pb magic prefix prepended * suitable for use to filter wal edits in distributedLogReplay mode */ public static byte[] regionSequenceIdsToByteArray(final Long regionLastFlushedSequenceId, final Map<byte[], Long> storeSequenceIds) { ZooKeeperProtos.RegionStoreSequenceIds.Builder regionSequenceIdsBuilder = ZooKeeperProtos.RegionStoreSequenceIds.newBuilder(); ZooKeeperProtos.StoreSequenceId.Builder storeSequenceIdBuilder = ZooKeeperProtos.StoreSequenceId.newBuilder(); if (storeSequenceIds != null) { for (Map.Entry<byte[], Long> e : storeSequenceIds.entrySet()){ byte[] columnFamilyName = e.getKey(); Long curSeqId = e.getValue(); storeSequenceIdBuilder.setFamilyName(ByteStringer.wrap(columnFamilyName)); storeSequenceIdBuilder.setSequenceId(curSeqId); regionSequenceIdsBuilder.addStoreSequenceId(storeSequenceIdBuilder.build()); storeSequenceIdBuilder.clear(); } } regionSequenceIdsBuilder.setLastFlushedSequenceId(regionLastFlushedSequenceId); byte[] result = regionSequenceIdsBuilder.build().toByteArray(); return ProtobufUtil.prependPBMagic(result); }
/** * @param bytes Content of serialized data of RegionStoreSequenceIds * @return a RegionStoreSequenceIds object * @throws DeserializationException */ public static RegionStoreSequenceIds parseRegionStoreSequenceIds(final byte[] bytes) throws DeserializationException { if (bytes == null || !ProtobufUtil.isPBMagicPrefix(bytes)) { throw new DeserializationException("Unable to parse RegionStoreSequenceIds."); } RegionStoreSequenceIds.Builder regionSequenceIdsBuilder = ZooKeeperProtos.RegionStoreSequenceIds.newBuilder(); int pblen = ProtobufUtil.lengthOfPBMagic(); RegionStoreSequenceIds storeIds = null; try { storeIds = regionSequenceIdsBuilder.mergeFrom(bytes, pblen, bytes.length - pblen).build(); } catch (InvalidProtocolBufferException e) { throw new DeserializationException(e); } return storeIds; }
/** * @param regionLastFlushedSequenceId the flushed sequence id of a region which is the min of its * store max seq ids * @param storeSequenceIds column family to sequence Id map * @return Serialized protobuf of <code>RegionSequenceIds</code> with pb magic prefix prepended * suitable for use to filter wal edits in distributedLogReplay mode */ public static byte[] regionSequenceIdsToByteArray(final Long regionLastFlushedSequenceId, final Map<byte[], Long> storeSequenceIds) { ZooKeeperProtos.RegionStoreSequenceIds.Builder regionSequenceIdsBuilder = ZooKeeperProtos.RegionStoreSequenceIds.newBuilder(); ZooKeeperProtos.StoreSequenceId.Builder storeSequenceIdBuilder = ZooKeeperProtos.StoreSequenceId.newBuilder(); if (storeSequenceIds != null) { for (Map.Entry<byte[], Long> e : storeSequenceIds.entrySet()){ byte[] columnFamilyName = e.getKey(); Long curSeqId = e.getValue(); storeSequenceIdBuilder.setFamilyName(HBaseZeroCopyByteString.wrap(columnFamilyName)); storeSequenceIdBuilder.setSequenceId(curSeqId); regionSequenceIdsBuilder.addStoreSequenceId(storeSequenceIdBuilder.build()); storeSequenceIdBuilder.clear(); } } regionSequenceIdsBuilder.setLastFlushedSequenceId(regionLastFlushedSequenceId); byte[] result = regionSequenceIdsBuilder.build().toByteArray(); return ProtobufUtil.prependPBMagic(result); }
/** * @param regionLastFlushedSequenceId the flushed sequence id of a region which is the min of its * store max seq ids * @param storeSequenceIds column family to sequence Id map * @return Serialized protobuf of <code>RegionSequenceIds</code> with pb magic prefix prepended * suitable for use to filter wal edits in distributedLogReplay mode */ public static byte[] regionSequenceIdsToByteArray(final Long regionLastFlushedSequenceId, final Map<byte[], Long> storeSequenceIds) { ZooKeeperProtos.RegionStoreSequenceIds.Builder regionSequenceIdsBuilder = ZooKeeperProtos.RegionStoreSequenceIds.newBuilder(); ZooKeeperProtos.StoreSequenceId.Builder storeSequenceIdBuilder = ZooKeeperProtos.StoreSequenceId.newBuilder(); if (storeSequenceIds != null) { for (Map.Entry<byte[], Long> e : storeSequenceIds.entrySet()){ byte[] columnFamilyName = e.getKey(); Long curSeqId = e.getValue(); storeSequenceIdBuilder.setFamilyName(ZeroCopyLiteralByteString.wrap(columnFamilyName)); storeSequenceIdBuilder.setSequenceId(curSeqId); regionSequenceIdsBuilder.addStoreSequenceId(storeSequenceIdBuilder.build()); storeSequenceIdBuilder.clear(); } } regionSequenceIdsBuilder.setLastFlushedSequenceId(regionLastFlushedSequenceId); byte[] result = regionSequenceIdsBuilder.build().toByteArray(); return ProtobufUtil.prependPBMagic(result); }
@Override public RegionStoreSequenceIds getRegionFlushedSequenceId(String failedServerName, String key) throws IOException { return ZKSplitLog.getRegionFlushedSequenceId(watcher, failedServerName, key); }
RegionStoreSequenceIds getRegionFlushedSequenceId(String failedServerName, String key) throws IOException;
/** * Locate destination region based on table name & row. This function also makes sure the * destination region is online for replay. * @throws IOException */ private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn, TableName table, byte[] row, String originalEncodedRegionName) throws IOException { // fetch location from cache HRegionLocation loc = onlineRegions.get(originalEncodedRegionName); if(loc != null) return loc; // fetch location from hbase:meta directly without using cache to avoid hit old dead server loc = hconn.getRegionLocation(table, row, true); if (loc == null) { throw new IOException("Can't locate location for row:" + Bytes.toString(row) + " of table:" + table); } // check if current row moves to a different region due to region merge/split if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) { // originalEncodedRegionName should have already flushed lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE); HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName()); if (tmpLoc != null) return tmpLoc; } Long lastFlushedSequenceId = -1l; AtomicBoolean isRecovering = new AtomicBoolean(true); loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering); if (!isRecovering.get()) { // region isn't in recovering at all because WAL file may contain a region that has // been moved to somewhere before hosting RS fails lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE); LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName() + " because it's not in recovering."); } else { Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName()); // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will // update the value for the region RegionStoreSequenceIds ids = csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, loc.getRegionInfo().getEncodedName()); if (ids != null) { lastFlushedSequenceId = ids.getLastFlushedSequenceId(); Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList(); for (StoreSequenceId id : maxSeqIdInStores) { storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId()); } regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds); } if (cachedLastFlushedSequenceId == null || lastFlushedSequenceId > cachedLastFlushedSequenceId) { lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId); } } onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc); return loc; }
/** * Locate destination region based on table name & row. This function also makes sure the * destination region is online for replay. * @throws IOException */ private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn, TableName table, byte[] row, String originalEncodedRegionName) throws IOException { // fetch location from cache HRegionLocation loc = onlineRegions.get(originalEncodedRegionName); if(loc != null) return loc; // fetch location from hbase:meta directly without using cache to avoid hit old dead server loc = hconn.getRegionLocation(table, row, true); if (loc == null) { throw new IOException("Can't locate location for row:" + Bytes.toString(row) + " of table:" + table); } // check if current row moves to a different region due to region merge/split if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) { // originalEncodedRegionName should have already flushed lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE); HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName()); if (tmpLoc != null) return tmpLoc; } Long lastFlushedSequenceId = -1l; AtomicBoolean isRecovering = new AtomicBoolean(true); loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering); if (!isRecovering.get()) { // region isn't in recovering at all because WAL file may contain a region that has // been moved to somewhere before hosting RS fails lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE); LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName() + " because it's not in recovering."); } else { Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName()); // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will // update the value for the region RegionStoreSequenceIds ids = SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc .getRegionInfo().getEncodedName()); if (ids != null) { lastFlushedSequenceId = ids.getLastFlushedSequenceId(); Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList(); for (StoreSequenceId id : maxSeqIdInStores) { storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId()); } regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds); } if (cachedLastFlushedSequenceId == null || lastFlushedSequenceId > cachedLastFlushedSequenceId) { lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId); } } onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc); return loc; }