public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir, FileSystem fs, Configuration conf, final WALFactory factory) throws IOException { final FileStatus[] logfiles = SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null); List<Path> splits = new ArrayList<Path>(); if (logfiles != null && logfiles.length > 0) { for (FileStatus logfile: logfiles) { WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null, RecoveryMode.LOG_SPLITTING); if (s.splitLogFile(logfile, null)) { finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); if (s.outputSink.splits != null) { splits.addAll(s.outputSink.splits); } } } } if (!fs.delete(logDir, true)) { throw new IOException("Unable to delete src dir: " + logDir); } return splits; }
@VisibleForTesting public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir, FileSystem fs, Configuration conf, final WALFactory factory) throws IOException { final FileStatus[] logfiles = SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null); List<Path> splits = new ArrayList<>(); if (ArrayUtils.isNotEmpty(logfiles)) { for (FileStatus logfile: logfiles) { WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null); if (s.splitLogFile(logfile, null)) { finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); if (s.outputSink.splits != null) { splits.addAll(s.outputSink.splits); } } } } if (!fs.delete(logDir, true)) { throw new IOException("Unable to delete src dir: " + logDir); } return splits; }
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server, TaskExecutor splitTaskExecutor) { super(watcher); this.server = server; this.serverName = server.getServerName(); this.splitTaskExecutor = splitTaskExecutor; report_period = conf.getInt("hbase.splitlog.report.period", conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3); this.conf = conf; this.executorService = this.server.getExecutorService(); this.maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS); }
/** * A helper function to store the last flushed sequence Id with the previous failed RS for a * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed * sequence id is only valid for each RS, we associate the Id with corresponding failed RS. * @throws KeeperException * @throws IOException */ private void updateRecoveringRegionLastFlushedSequenceId(HRegion r) throws KeeperException, IOException { if (!r.isRecovering()) { // return immdiately for non-recovering regions return; } HRegionInfo region = r.getRegionInfo(); ZooKeeperWatcher zkw = getZooKeeper(); String previousRSName = this.getLastFailedRSFromZK(region.getEncodedName()); Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqIdForLogReplay(); long minSeqIdForLogReplay = -1; for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) { if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) { minSeqIdForLogReplay = storeSeqIdForReplay; } } long lastRecordedFlushedSequenceId = -1; String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode, region.getEncodedName()); // recovering-region level byte[] data = ZKUtil.getData(zkw, nodePath); if (data != null) { lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data); } if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) { ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay)); } if (previousRSName != null) { // one level deeper for the failed RS nodePath = ZKUtil.joinZNode(nodePath, previousRSName); ZKUtil.setData(zkw, nodePath, ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores)); LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for " + previousRSName); } else { LOG.warn("Can't find failed region server for recovering region " + region.getEncodedName()); } }
public LogReplayOutputSink(int numWriters) { super(numWriters); this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT); this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters); this.logRecoveredEditsOutputSink.setReporter(reporter); }
/** * Locate destination region based on table name & row. This function also makes sure the * destination region is online for replay. * @throws IOException */ private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn, TableName table, byte[] row, String originalEncodedRegionName) throws IOException { // fetch location from cache HRegionLocation loc = onlineRegions.get(originalEncodedRegionName); if(loc != null) return loc; // fetch location from hbase:meta directly without using cache to avoid hit old dead server loc = hconn.getRegionLocation(table, row, true); if (loc == null) { throw new IOException("Can't locate location for row:" + Bytes.toString(row) + " of table:" + table); } // check if current row moves to a different region due to region merge/split if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) { // originalEncodedRegionName should have already flushed lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE); HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName()); if (tmpLoc != null) return tmpLoc; } Long lastFlushedSequenceId = -1l; AtomicBoolean isRecovering = new AtomicBoolean(true); loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering); if (!isRecovering.get()) { // region isn't in recovering at all because WAL file may contain a region that has // been moved to somewhere before hosting RS fails lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE); LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName() + " because it's not in recovering."); } else { Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName()); // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will // update the value for the region RegionStoreSequenceIds ids = SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc .getRegionInfo().getEncodedName()); if (ids != null) { lastFlushedSequenceId = ids.getLastFlushedSequenceId(); Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList(); for (StoreSequenceId id : maxSeqIdInStores) { storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId()); } regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds); } if (cachedLastFlushedSequenceId == null || lastFlushedSequenceId > cachedLastFlushedSequenceId) { lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId); } } onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc); return loc; }
/** * A helper function to store the last flushed sequence Id with the previous failed RS for a * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed * sequence id is only valid for each RS, we associate the Id with corresponding failed RS. * @throws KeeperException * @throws IOException */ private void updateRecoveringRegionLastFlushedSequenceId(HRegion r) throws KeeperException, IOException { if (!r.isRecovering()) { // return immdiately for non-recovering regions return; } HRegionInfo region = r.getRegionInfo(); ZooKeeperWatcher zkw = getZooKeeper(); String previousRSName = this.getLastFailedRSFromZK(region.getEncodedName()); Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqIdForLogReplay(); long minSeqIdForLogReplay = -1; for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) { if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) { minSeqIdForLogReplay = storeSeqIdForReplay; } } long lastRecordedFlushedSequenceId = -1; String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode, region.getEncodedName()); // recovering-region level byte[] data; try { data = ZKUtil.getData(zkw, nodePath); } catch (InterruptedException e) { throw new InterruptedIOException(); } if (data != null) { lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data); } if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) { ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay)); } if (previousRSName != null) { // one level deeper for the failed RS nodePath = ZKUtil.joinZNode(nodePath, previousRSName); ZKUtil.setData(zkw, nodePath, ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores)); LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for " + previousRSName); } else { LOG.warn("Can't find failed region server for recovering region " + region.getEncodedName()); } }
/** * A helper function to store the last flushed sequence Id with the previous failed RS for a * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed * sequence id is only valid for each RS, we associate the Id with corresponding failed RS. * @throws KeeperException * @throws IOException */ private void updateRecoveringRegionLastFlushedSequenceId(HRegion r) throws KeeperException, IOException { if (!r.isRecovering()) { // return immdiately for non-recovering regions return; } HRegionInfo region = r.getRegionInfo(); ZooKeeperWatcher zkw = getZooKeeper(); String previousRSName = this.getLastFailedRSFromZK(region.getEncodedName()); Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqIdForLogReplay(); long minSeqIdForLogReplay = -1; for (byte[] columnFamily : maxSeqIdInStores.keySet()) { Long storeSeqIdForReplay = maxSeqIdInStores.get(columnFamily); if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) { minSeqIdForLogReplay = storeSeqIdForReplay; } } long lastRecordedFlushedSequenceId = -1; String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode, region.getEncodedName()); // recovering-region level byte[] data = ZKUtil.getData(zkw, nodePath); if (data != null) { lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data); } if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) { ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay)); } if (previousRSName != null) { // one level deeper for the failed RS nodePath = ZKUtil.joinZNode(nodePath, previousRSName); ZKUtil.setData(zkw, nodePath, ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores)); LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for " + previousRSName); } else { LOG.warn("Can't find failed region server for recovering region " + region.getEncodedName()); } }
public DistributedLogSplittingHelper(CancelableProgressable reporter) { this.splitReporter = reporter; report_period = conf.getInt("hbase.splitlog.report.period", conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 2); }