ServerManager(final Server master, final MasterServices services, final boolean connect) throws IOException { this.master = master; this.services = services; Configuration c = master.getConfiguration(); maxSkew = c.getLong("hbase.master.maxclockskew", 30000); warningSkew = c.getLong("hbase.master.warningclockskew", 10000); this.connection = connect ? (ClusterConnection)ConnectionFactory.createConnection(c) : null; int pingMaxAttempts = Math.max(1, master.getConfiguration().getInt( "hbase.master.maximum.ping.server.attempts", 10)); int pingSleepInterval = Math.max(1, master.getConfiguration().getInt( "hbase.master.ping.server.retry.sleep.interval", 100)); this.pingRetryCounterFactory = new RetryCounterFactory(pingMaxAttempts, pingSleepInterval); this.rpcControllerFactory = this.connection == null ? null : connection.getRpcControllerFactory(); }
@Override public void setConf(Configuration conf) { super.setConf(conf); if (conf == null) { // Configured gets passed null before real conf. Why? I don't know. return; } sshUserName = conf.get("hbase.it.clustermanager.ssh.user", ""); String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", ""); sshOptions = System.getenv("HBASE_SSH_OPTS"); if (!extraSshOptions.isEmpty()) { sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " "); } sshOptions = (sshOptions == null) ? "" : sshOptions; tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD); // Print out ssh special config if any. if ((sshUserName != null && sshUserName.length() > 0) || (sshOptions != null && sshOptions.length() > 0)) { LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]"); } this.retryCounterFactory = new RetryCounterFactory(new RetryConfig() .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS)) .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL))); }
@Override public void setConf(Configuration conf) { super.setConf(conf); if (conf == null) { // Configured gets passed null before real conf. Why? I don't know. return; } sshUserName = conf.get("hbase.it.clustermanager.ssh.user", ""); String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", ""); sshOptions = System.getenv("HBASE_SSH_OPTS"); if (!extraSshOptions.isEmpty()) { sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " "); } sshOptions = (sshOptions == null) ? "" : sshOptions; sshUserName = (sshUserName == null) ? "" : sshUserName; tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD); // Print out ssh special config if any. if ((sshUserName != null && sshUserName.length() > 0) || (sshOptions != null && sshOptions.length() > 0)) { LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]"); } this.retryCounterFactory = new RetryCounterFactory(new RetryConfig() .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS)) .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL))); }
ServerManager(final Server master, final MasterServices services, final boolean connect) throws IOException { this.master = master; this.services = services; Configuration c = master.getConfiguration(); maxSkew = c.getLong("hbase.master.maxclockskew", 30000); warningSkew = c.getLong("hbase.master.warningclockskew", 10000); this.connection = connect ? (ClusterConnection)ConnectionFactory.createConnection(c) : null; int pingMaxAttempts = Math.max(1, master.getConfiguration().getInt( "hbase.master.maximum.ping.server.attempts", 10)); int pingSleepInterval = Math.max(1, master.getConfiguration().getInt( "hbase.master.ping.server.retry.sleep.interval", 100)); this.pingRetryCounterFactory = new RetryCounterFactory(pingMaxAttempts, pingSleepInterval); }
void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException { long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); int maxAttempts = getRetriesCount(connection.getConfiguration()); RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create(); if (LOG.isDebugEnabled()) { LOG.debug("Attempting to do an RPC to the primary region replica " + ServerRegionReplicaUtil .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region " + region.getRegionInfo().getEncodedName() + " to trigger a flush"); } while (!region.isClosing() && !region.isClosed() && !server.isAborted() && !server.isStopped()) { FlushRegionCallable flushCallable = new FlushRegionCallable( connection, rpcControllerFactory, RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true); // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we // do not have to wait for the whole flush here, just initiate it. FlushRegionResponse response = null; try { response = rpcRetryingCallerFactory.<FlushRegionResponse>newCaller() .callWithRetries(flushCallable, this.operationTimeout); } catch (IOException ex) { if (ex instanceof TableNotFoundException || connection.isTableDisabled(region.getRegionInfo().getTable())) { return; } throw ex; } if (response.getFlushed()) { // then we have to wait for seeing the flush entry. All reads will be rejected until we see // a complete flush cycle or replay a region open event if (LOG.isDebugEnabled()) { LOG.debug("Successfully triggered a flush of primary region replica " + ServerRegionReplicaUtil .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and blocking reads until observing a full flush cycle"); } break; } else { if (response.hasWroteFlushWalMarker()) { if(response.getWroteFlushWalMarker()) { if (LOG.isDebugEnabled()) { LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary " + "region replica " + ServerRegionReplicaUtil .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and " + "blocking reads until observing a flush marker"); } break; } else { // somehow we were not able to get the primary to write the flush request. It may be // closing or already flushing. Retry flush again after some sleep. if (!counter.shouldRetry()) { throw new IOException("Cannot cause primary to flush or drop a wal marker after " + "retries. Failing opening of this region replica " + region.getRegionInfo().getEncodedName()); } } } else { // nothing to do. Are we dealing with an old server? LOG.warn("Was not able to trigger a flush from primary region due to old server version? " + "Continuing to open the secondary region replica: " + region.getRegionInfo().getEncodedName()); region.setReadsEnabled(true); break; } } try { counter.sleepUntilNextRetry(); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } } }