public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors, EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool, int numWriters, int operationTimeout) { super(controller, entryBuffers, numWriters); this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout); this.tableDescriptors = tableDescriptors; // A cache for the table "memstore replication enabled" flag. // It has a default expiry of 5 sec. This means that if the table is altered // with a different flag value, we might miss to replicate for that amount of // time. But this cache avoid the slow lookup and parsing of the TableDescriptor. int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration() .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000); this.memstoreReplicationEnabled = CacheBuilder.newBuilder() .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS) .initialCapacity(10) .maximumSize(1000) .build(); }
@Override public void init(Context context) throws IOException { super.init(context); this.conf = HBaseConfiguration.create(context.getConfiguration()); this.tableDescriptors = context.getTableDescriptors(); // HRS multiplies client retries by 10 globally for meta operations, but we do not want this. // We are resetting it here because we want default number of retries (35) rather than 10 times // that which makes very long retries for disabled tables etc. int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); if (defaultNumRetries > 10) { int mult = conf.getInt("hbase.client.serverside.retries.multiplier", 10); defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already } conf.setInt("hbase.client.serverside.retries.multiplier", 1); int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries); this.numWriterThreads = this.conf.getInt( "hbase.region.replica.replication.writer.threads", 3); controller = new PipelineController(); entryBuffers = new EntryBuffers(controller, this.conf.getInt("hbase.region.replica.replication.buffersize", 128*1024*1024)); // use the regular RPC timeout for replica replication RPC's this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); }
@Override public void init(Context context) throws IOException { super.init(context); this.conf = HBaseConfiguration.create(context.getConfiguration()); this.tableDescriptors = context.getTableDescriptors(); // HRS multiplies client retries by 10 globally for meta operations, but we do not want this. // We are resetting it here because we want default number of retries (35) rather than 10 times // that which makes very long retries for disabled tables etc. int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); if (defaultNumRetries > 10) { int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER); defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already } conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries); this.numWriterThreads = this.conf.getInt( "hbase.region.replica.replication.writer.threads", 3); controller = new PipelineController(); entryBuffers = new EntryBuffers(controller, this.conf.getInt("hbase.region.replica.replication.buffersize", 128*1024*1024)); // use the regular RPC timeout for replica replication RPC's this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); }