/** * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration, * Stoppable, String, TaskFinisher)} that provides a task finisher for * copying recovered edits to their final destination. The task finisher * has to be robust because it can be arbitrarily restarted or called * multiple times. * * @param zkw * @param conf * @param stopper * @param serverName */ public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, Stoppable stopper, MasterServices master, String serverName) { this(zkw, conf, stopper, master, serverName, new TaskFinisher() { @Override public Status finish(String workerName, String logfile) { try { HLogSplitter.finishSplitLogFile(logfile, conf); } catch (IOException e) { LOG.warn("Could not finish splitting of log file " + logfile, e); return Status.ERR; } return Status.DONE; } }); }
/** * Performs log splitting for all regionserver directories. * @throws Exception */ private void doOfflineLogSplitting() throws Exception { LOG.info("Starting Log splitting"); final Path rootDir = FSUtils.getRootDir(getConf()); final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); FileSystem fs = FSUtils.getCurrentFileSystem(getConf()); Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); FileStatus[] regionServerLogDirs = FSUtils.listStatus(fs, logDir); if (regionServerLogDirs == null || regionServerLogDirs.length == 0) { LOG.info("No log directories to split, returning"); return; } try { for (FileStatus regionServerLogDir : regionServerLogDirs) { // split its log dir, if exists HLogSplitter.split(rootDir, regionServerLogDir.getPath(), oldLogDir, fs, getConf()); } LOG.info("Successfully completed Log splitting"); } catch (Exception e) { LOG.error("Got exception while doing Log splitting ", e); throw e; } }
public MasterFileSystem(Server master, MasterServices services, boolean masterRecovery) throws IOException { this.conf = master.getConfiguration(); this.master = master; this.services = services; // Set filesystem to be that of this.rootdir else we get complaints about // mismatched filesystems if hbase.rootdir is hdfs and fs.defaultFS is // default localfs. Presumption is that rootdir is fully-qualified before // we get to here with appropriate fs scheme. this.rootdir = FSUtils.getRootDir(conf); this.tempdir = new Path(this.rootdir, HConstants.HBASE_TEMP_DIRECTORY); // Cover both bases, the old way of setting default fs and the new. // We're supposed to run on 0.20 and 0.21 anyways. this.fs = this.rootdir.getFileSystem(conf); FSUtils.setFsDefault(conf, new Path(this.fs.getUri())); // make sure the fs has the same conf fs.setConf(conf); this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf); // setup the filesystem variable // set up the archived logs path this.oldLogDir = createInitialFileSystemLayout(); HFileSystem.addLocationsOrderInterceptor(conf); this.splitLogManager = new SplitLogManager(master.getZooKeeper(), master.getConfiguration(), master, services, master.getServerName(), masterRecovery); }
ServerShutdownHandler(final Server server, final MasterServices services, final DeadServer deadServers, final ServerName serverName, EventType type, final boolean shouldSplitHlog) { super(server, type); this.serverName = serverName; this.server = server; this.services = services; this.deadServers = deadServers; if (!this.deadServers.isDeadServer(this.serverName)) { LOG.warn(this.serverName + " is NOT in deadservers; it should be!"); } this.shouldSplitHlog = shouldSplitHlog; this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(server.getConfiguration()); this.regionAssignmentWaitTimeout = server.getConfiguration().getInt( HConstants.LOG_REPLAY_WAIT_REGION_TIMEOUT, 15000); }
/** * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, * Stoppable stopper, MasterServices master, ServerName serverName, * boolean masterRecovery, TaskFinisher tf)} * that provides a task finisher for copying recovered edits to their final destination. * The task finisher has to be robust because it can be arbitrarily restarted or called * multiple times. * * @param zkw the ZK watcher * @param conf the HBase configuration * @param stopper the stoppable in case anything is wrong * @param master the master services * @param serverName the master server name * @param masterRecovery an indication if the master is in recovery */ public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, Stoppable stopper, MasterServices master, ServerName serverName, boolean masterRecovery) { this(zkw, conf, stopper, master, serverName, masterRecovery, new TaskFinisher() { @Override public Status finish(ServerName workerName, String logfile) { try { HLogSplitter.finishSplitLogFile(logfile, conf); } catch (IOException e) { LOG.warn("Could not finish splitting of log file " + logfile, e); return Status.ERR; } return Status.DONE; } }); }
/** * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration, * Stoppable, String, TaskFinisher)} that provides a task finisher for * copying recovered edits to their final destination. The task finisher * has to be robust because it can be arbitrarily restarted or called * multiple times. * * @param zkw * @param conf * @param stopper * @param serverName */ public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, Stoppable stopper, String serverName) { this(zkw, conf, stopper, serverName, new TaskFinisher() { @Override public Status finish(String workerName, String logfile) { String tmpname = ZKSplitLog.getSplitLogDirTmpComponent(workerName, logfile); try { HLogSplitter.moveRecoveredEditsFromTemp(tmpname, logfile, conf); } catch (IOException e) { LOG.warn("Could not finish splitting of log file " + logfile); return Status.ERR; } return Status.DONE; } }); }
public MasterFileSystem(Server master, MasterServices services) throws IOException { this.conf = master.getConfiguration(); this.master = master; this.services = services; // Set filesystem to be that of this.rootdir else we get complaints about // mismatched filesystems if hbase.rootdir is hdfs and fs.defaultFS is // default localfs. Presumption is that rootdir is fully-qualified before // we get to here with appropriate fs scheme. this.rootdir = FSUtils.getRootDir(conf); this.tempdir = new Path(this.rootdir, HConstants.HBASE_TEMP_DIRECTORY); // Cover both bases, the old way of setting default fs and the new. // We're supposed to run on 0.20 and 0.21 anyways. this.fs = this.rootdir.getFileSystem(conf); FSUtils.setFsDefault(conf, new Path(this.fs.getUri())); // make sure the fs has the same conf fs.setConf(conf); this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf); // setup the filesystem variable // set up the archived logs path this.oldLogDir = createInitialFileSystemLayout(); HFileSystem.addLocationsOrderInterceptor(conf); this.splitLogManager = new SplitLogManager(master.getZooKeeper(), master.getConfiguration(), master, services, master.getServerName()); }
/** * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, * Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf)} * that provides a task finisher for copying recovered edits to their final destination. * The task finisher has to be robust because it can be arbitrarily restarted or called * multiple times. * * @param zkw the ZK watcher * @param conf the HBase configuration * @param stopper the stoppable in case anything is wrong * @param master the master services * @param serverName the master server name */ public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, Stoppable stopper, MasterServices master, ServerName serverName) { this(zkw, conf, stopper, master, serverName, new TaskFinisher() { @Override public Status finish(ServerName workerName, String logfile) { try { HLogSplitter.finishSplitLogFile(logfile, conf); } catch (IOException e) { LOG.warn("Could not finish splitting of log file " + logfile, e); return Status.ERR; } return Status.DONE; } }); }
/** * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, * Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf)} * that provides a task finisher for copying recovered edits to their final destination. * The task finisher has to be robust because it can be arbitrarily restarted or called * multiple times. * * @param zkw * @param conf * @param stopper * @param serverName */ public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, Stoppable stopper, MasterServices master, ServerName serverName) { this(zkw, conf, stopper, master, serverName, new TaskFinisher() { @Override public Status finish(ServerName workerName, String logfile) { try { HLogSplitter.finishSplitLogFile(logfile, conf); } catch (IOException e) { LOG.warn("Could not finish splitting of log file " + logfile, e); return Status.ERR; } return Status.DONE; } }); }
private Path runWALSplit(final Configuration c) throws IOException { FileSystem fs = FileSystem.get(c); HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c, this.hbaseRootDir, this.logDir, this.oldLogDir, fs); List<Path> splits = logSplitter.splitLog(); // Split should generate only 1 file since there's only 1 region assertEquals(1, splits.size()); // Make sure the file exists assertTrue(fs.exists(splits.get(0))); LOG.info("Split file=" + splits.get(0)); return splits.get(0); }
/** * Its OK to construct this object even when region-servers are not online. It * does lookup the orphan tasks in zk but it doesn't block waiting for them * to be done. * * @param zkw the ZK watcher * @param conf the HBase configuration * @param stopper the stoppable in case anything is wrong * @param master the master services * @param serverName the master server name * @param masterRecovery an indication if the master is in recovery * @param tf task finisher */ public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, Stoppable stopper, MasterServices master, ServerName serverName, boolean masterRecovery, TaskFinisher tf) { super(zkw); this.taskFinisher = tf; this.conf = conf; this.stopper = stopper; this.master = master; this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES); this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT); this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT); this.unassignedTimeout = conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf); LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout + ", distributedLogReplay=" + this.distributedLogReplay); this.serverName = serverName; this.timeoutMonitor = new TimeoutMonitor( conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper); this.failedDeletions = Collections.synchronizedSet(new HashSet<String>()); if (!masterRecovery) { Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName + ".splitLogManagerTimeoutMonitor"); } // Watcher can be null during tests with Mock'd servers. if (this.watcher != null) { this.watcher.registerListener(this); lookForOrphans(); } }
/** * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. * @param region * @param mutations * @return an array of OperationStatus which internally contains the OperationStatusCode and the * exceptionMessage if any * @throws IOException */ protected OperationStatus [] doReplayBatchOp(final HRegion region, final List<HLogSplitter.MutationReplay> mutations) throws IOException { HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()]; long before = EnvironmentEdgeManager.currentTimeMillis(); boolean batchContainsPuts = false, batchContainsDelete = false; try { int i = 0; for (HLogSplitter.MutationReplay m : mutations) { if (m.type == MutationType.PUT) { batchContainsPuts = true; } else { batchContainsDelete = true; } mArray[i++] = m; } requestCount.add(mutations.size()); if (!region.getRegionInfo().isMetaTable()) { cacheFlusher.reclaimMemStoreMemory(); } return region.batchReplay(mArray); } finally { long after = EnvironmentEdgeManager.currentTimeMillis(); if (batchContainsPuts) { metricsRegionServer.updatePut(after - before); } if (batchContainsDelete) { metricsRegionServer.updateDelete(after - before); } } }
private Path runWALSplit(final Configuration c) throws IOException { List<Path> splits = HLogSplitter.split( hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c); // Split should generate only 1 file since there's only 1 region assertEquals(1, splits.size()); // Make sure the file exists assertTrue(fs.exists(splits.get(0))); LOG.info("Split file=" + splits.get(0)); return splits.get(0); }
/** * Its OK to construct this object even when region-servers are not online. It * does lookup the orphan tasks in zk but it doesn't block waiting for them * to be done. * * @param zkw the ZK watcher * @param conf the HBase configuration * @param stopper the stoppable in case anything is wrong * @param master the master services * @param serverName the master server name * @param tf task finisher */ public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf) { super(zkw); this.taskFinisher = tf; this.conf = conf; this.stopper = stopper; this.master = master; this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES); this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT); this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT); this.unassignedTimeout = conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf); LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout + ", distributedLogReplay=" + this.distributedLogReplay); this.serverName = serverName; this.timeoutMonitor = new TimeoutMonitor( conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper); this.failedDeletions = Collections.synchronizedSet(new HashSet<String>()); Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName + ".splitLogManagerTimeoutMonitor"); // Watcher can be null during tests with Mock'd servers. if (this.watcher != null) { this.watcher.registerListener(this); lookForOrphans(); } }
/** * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. * @param region * @param mutations * @return an array of OperationStatus which internally contains the OperationStatusCode and the * exceptionMessage if any * @throws IOException */ private OperationStatus [] doReplayBatchOp(final HRegion region, final List<HLogSplitter.MutationReplay> mutations) throws IOException { HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()]; long before = EnvironmentEdgeManager.currentTimeMillis(); boolean batchContainsPuts = false, batchContainsDelete = false; try { int i = 0; for (HLogSplitter.MutationReplay m : mutations) { if (m.type == MutationType.PUT) { batchContainsPuts = true; } else { batchContainsDelete = true; } mArray[i++] = m; } requestCount.add(mutations.size()); if (!region.getRegionInfo().isMetaTable()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); } return region.batchReplay(mArray); } finally { if (regionServer.metricsRegionServer != null) { long after = EnvironmentEdgeManager.currentTimeMillis(); if (batchContainsPuts) { regionServer.metricsRegionServer.updatePut(after - before); } if (batchContainsDelete) { regionServer.metricsRegionServer.updateDelete(after - before); } } } }
private void initializeTHLog() throws IOException { // We keep in the same directory as the core HLog. Path oldLogDir = new Path(getRootDir(), HLogSplitter.RECOVERED_EDITS); Path logdir = new Path(getRootDir(), HLog.getHLogDirectoryName(super.getServerName().getServerName())); trxHLog = new THLog(getFileSystem(), logdir, oldLogDir, conf, null); }
/** * This method is the base split method that splits HLog files matching a filter. * Callers should pass the appropriate filter for meta and non-meta HLogs. * @param serverNames * @param filter * @throws IOException */ public void splitLog(final List<ServerName> serverNames, PathFilter filter) throws IOException { long splitTime = 0, splitLogSize = 0; List<Path> logDirs = getLogDirs(serverNames); if (logDirs.isEmpty()) { LOG.info("No logs to split"); return; } boolean lockAcquired = false; if (distributedLogSplitting) { try { if (!this.services.isServerShutdownHandlerEnabled()) { // process one log splitting task at one time before SSH is enabled. // because ROOT SSH and HMaster#assignMeta could both log split a same server this.splitLogLock.lock(); lockAcquired = true; } splitLogManager.handleDeadWorkers(serverNames); splitTime = EnvironmentEdgeManager.currentTimeMillis(); splitLogSize = splitLogManager.splitLogDistributed(logDirs, filter); splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime; } finally { if (lockAcquired) { this.splitLogLock.unlock(); } } } else { for(Path logDir: logDirs){ // splitLogLock ensures that dead region servers' logs are processed // one at a time this.splitLogLock.lock(); try { HLogSplitter splitter = HLogSplitter.createLogSplitter( conf, rootdir, logDir, oldLogDir, this.fs); try { // If FS is in safe mode, just wait till out of it. FSUtils.waitOnSafeMode(conf, conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1000)); splitter.splitLog(); } catch (OrphanHLogAfterSplitException e) { LOG.warn("Retrying splitting because of:", e); //An HLogSplitter instance can only be used once. Get new instance. splitter = HLogSplitter.createLogSplitter(conf, rootdir, logDir, oldLogDir, this.fs); splitter.splitLog(); } splitTime = splitter.getTime(); splitLogSize = splitter.getSize(); } finally { this.splitLogLock.unlock(); } } } if (this.metrics != null) { this.metrics.addSplit(splitTime, splitLogSize); } }
/** * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is * that the given mutations will be durable on the receiving RS if this method returns without any * exception. * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority = HConstants.REPLAY_QOS) public ReplicateWALEntryResponse replay(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTimeMillis(); CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); try { checkOpen(); List<WALEntry> entries = request.getEntryList(); if (entries == null || entries.isEmpty()) { // empty input return ReplicateWALEntryResponse.newBuilder().build(); } HRegion region = this.getRegionByEncodedName( entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>(); List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>(); // when tag is enabled, we need tag replay edits with log sequence number boolean needAddReplayTag = (HFile.getFormatVersion(this.conf) >= 3); for (WALEntry entry : entries) { if (nonceManager != null) { long nonceGroup = entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime()); } Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<HLogKey, WALEdit>(); List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry, cells, walEntry, needAddReplayTag); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), walEntry.getSecond())) { // if bypass this log entry, ignore it ... continue; } walEntries.add(walEntry); } mutations.addAll(edits); } if (!mutations.isEmpty()) { OperationStatus[] result = doReplayBatchOp(region, mutations); // check if it's a partial success for (int i = 0; result != null && i < result.length; i++) { if (result[i] != OperationStatus.SUCCESS) { throw new IOException(result[i].getExceptionMsg()); } } } if (coprocessorHost != null) { for (Pair<HLogKey, WALEdit> wal : walEntries) { coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(), wal.getSecond()); } } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } finally { metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTimeMillis() - before); } }
@Override public void run() { try { LOG.info("SplitLogWorker " + this.serverName + " starting"); this.watcher.registerListener(this); boolean distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf); if (distributedLogReplay) { // initialize a new connection for splitlogworker configuration HConnectionManager.getConnection(conf); } // wait for master to create the splitLogZnode int res = -1; while (res == -1 && !exitWorker) { try { res = ZKUtil.checkExists(watcher, watcher.splitLogZNode); } catch (KeeperException e) { // ignore LOG.warn("Exception when checking for " + watcher.splitLogZNode + " ... retrying", e); } if (res == -1) { LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create"); Thread.sleep(1000); } } if (!exitWorker) { taskLoop(); } } catch (Throwable t) { if (ExceptionUtil.isInterrupt(t)) { LOG.info("SplitLogWorker interrupted. Exiting. " + (exitWorker ? "" : " (ERROR: exitWorker is not set, exiting anyway)")); } else { // only a logical error can cause here. Printing it out // to make debugging easier LOG.error("unexpected error ", t); } } finally { LOG.info("SplitLogWorker " + this.serverName + " exiting"); } }
/** * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is * that the given mutations will be durable on the receiving RS if this method returns without any * exception. * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority = HConstants.REPLAY_QOS) public ReplicateWALEntryResponse replay(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTimeMillis(); CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); try { checkOpen(); List<WALEntry> entries = request.getEntryList(); if (entries == null || entries.isEmpty()) { // empty input return ReplicateWALEntryResponse.newBuilder().build(); } HRegion region = regionServer.getRegionByEncodedName( entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>(); List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>(); // when tag is enabled, we need tag replay edits with log sequence number boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3); for (WALEntry entry : entries) { if (regionServer.nonceManager != null) { long nonceGroup = entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime()); } Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<HLogKey, WALEdit>(); List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry, cells, walEntry, needAddReplayTag); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), walEntry.getSecond())) { // if bypass this log entry, ignore it ... continue; } walEntries.add(walEntry); } mutations.addAll(edits); } if (!mutations.isEmpty()) { OperationStatus[] result = doReplayBatchOp(region, mutations); // check if it's a partial success for (int i = 0; result != null && i < result.length; i++) { if (result[i] != OperationStatus.SUCCESS) { throw new IOException(result[i].getExceptionMsg()); } } } if (coprocessorHost != null) { for (Pair<HLogKey, WALEdit> wal : walEntries) { coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(), wal.getSecond()); } } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } finally { if (regionServer.metricsRegionServer != null) { regionServer.metricsRegionServer.updateReplay( EnvironmentEdgeManager.currentTimeMillis() - before); } } }
/** * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is * that the given mutations will be durable on the receiving RS if this method returns without any * exception. * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority = HConstants.REPLAY_QOS) public ReplicateWALEntryResponse replay(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTimeMillis(); CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); try { checkOpen(); List<WALEntry> entries = request.getEntryList(); if (entries == null || entries.isEmpty()) { // empty input return ReplicateWALEntryResponse.newBuilder().build(); } HRegion region = this.getRegionByEncodedName( entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>(); List<Pair<MutationType, Mutation>> mutations = new ArrayList<Pair<MutationType, Mutation>>(); for (WALEntry entry : entries) { Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<HLogKey, WALEdit>(); List<Pair<MutationType, Mutation>> edits = HLogSplitter.getMutationsFromWALEntry(entry, cells, walEntry); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), walEntry.getSecond())) { // if bypass this log entry, ignore it ... continue; } walEntries.add(walEntry); } mutations.addAll(edits); } if (!mutations.isEmpty()) { OperationStatus[] result = doBatchOp(region, mutations, true); // check if it's a partial success for (int i = 0; result != null && i < result.length; i++) { if (result[i] != OperationStatus.SUCCESS) { throw new IOException(result[i].getExceptionMsg()); } } } if (coprocessorHost != null) { for (Pair<HLogKey, WALEdit> wal : walEntries) { coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(), wal.getSecond()); } } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } finally { metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTimeMillis() - before); } }