Java 类org.apache.hadoop.hbase.replication.WALEntryFilter 实例源码

项目:ditb    文件:RegionReplicaReplicationEndpoint.java   
@Override
public WALEntryFilter getWALEntryfilter() {
  WALEntryFilter superFilter = super.getWALEntryfilter();
  WALEntryFilter skipReplayedEditsFilter = getSkipReplayedEditsFilter();

  if (superFilter == null) {
    return skipReplayedEditsFilter;
  }

  if (skipReplayedEditsFilter == null) {
    return superFilter;
  }

  ArrayList<WALEntryFilter> filters = Lists.newArrayList();
  filters.add(superFilter);
  filters.add(skipReplayedEditsFilter);
  return new ChainWALEntryFilter(filters);
}
项目:hbase    文件:ReplicationSource.java   
private void initializeWALEntryFilter() {
  // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
  ArrayList<WALEntryFilter> filters =
    Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter());
  WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
  if (filterFromEndpoint != null) {
    filters.add(filterFromEndpoint);
  }
  filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
  this.walEntryFilter = new ChainWALEntryFilter(filters);
}
项目:hbase    文件:ReplicationSourceWALReader.java   
/**
 * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
 * entries, and puts them on a batch queue.
 * @param fs the files system to use
 * @param conf configuration to use
 * @param logQueue The WAL queue to read off of
 * @param startPosition position in the first WAL to start reading from
 * @param filter The filter to use while reading
 * @param source replication source
 */
public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
    PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
    ReplicationSource source) {
  this.logQueue = logQueue;
  this.currentPosition = startPosition;
  this.fs = fs;
  this.conf = conf;
  this.filter = filter;
  this.source = source;
  this.replicationBatchSizeCapacity =
      this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
  this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
  // memory used will be batchSizeCapacity * (nb.batches + 1)
  // the +1 is for the current thread reading before placing onto the queue
  int batchCount = conf.getInt("replication.source.nb.batches", 1);
  this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
  this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
    HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
  this.sleepForRetries =
      this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
  this.maxRetriesMultiplier =
      this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
  this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
  this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
  LOG.info("peerClusterZnode=" + source.getQueueId()
      + ", ReplicationSourceWALReaderThread : " + source.getPeerId()
      + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
      + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
      + ", replicationBatchQueueCapacity=" + batchCount);
}
项目:hbase    文件:TestWALEntryStream.java   
private WALEntryFilter getDummyFilter() {
  return new WALEntryFilter() {

    @Override
    public Entry filter(Entry entry) {
      return entry;
    }
  };
}
项目:ditb    文件:ReplicationSource.java   
@Override
public void run() {
  // mark we are running now
  this.sourceRunning = true;
  try {
    // start the endpoint, connect to the cluster
    Service.State state = replicationEndpoint.start().get();
    if (state != Service.State.RUNNING) {
      LOG.warn("ReplicationEndpoint was not started. Exiting");
      uninitialize();
      return;
    }
  } catch (Exception ex) {
    LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
    throw new RuntimeException(ex);
  }

  // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
  ArrayList<WALEntryFilter> filters = Lists.newArrayList(
    (WALEntryFilter)new SystemTableWALEntryFilter());
  WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
  if (filterFromEndpoint != null) {
    filters.add(filterFromEndpoint);
  }
  this.walEntryFilter = new ChainWALEntryFilter(filters);

  int sleepMultiplier = 1;
  // delay this until we are in an asynchronous thread
  while (this.isSourceActive() && this.peerClusterId == null) {
    this.peerClusterId = replicationEndpoint.getPeerUUID();
    if (this.isSourceActive() && this.peerClusterId == null) {
      if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
        sleepMultiplier++;
      }
    }
  }

  // In rare case, zookeeper setting may be messed up. That leads to the incorrect
  // peerClusterId value, which is the same as the source clusterId
  if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
    this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
        + peerClusterId + " which is not allowed by ReplicationEndpoint:"
        + replicationEndpoint.getClass().getName(), null, false);
  }
  LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
  // start workers
  for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
    String walGroupId = entry.getKey();
    PriorityBlockingQueue<Path> queue = entry.getValue();
    final ReplicationSourceWorkerThread worker =
        new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
    ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
    if (extant != null) {
      LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
    } else {
      LOG.debug("Starting up worker for wal group " + walGroupId);
      worker.startup();
    }
  }
}
项目:ditb    文件:RegionReplicaReplicationEndpoint.java   
protected WALEntryFilter getSkipReplayedEditsFilter() {
  return new SkipReplayedEditsFilter();
}
项目:ditb    文件:RegionReplicaReplicationEndpoint.java   
@Override
protected WALEntryFilter getScopeWALEntryFilter() {
  // we do not care about scope. We replicate everything.
  return null;
}
项目:ditb    文件:VisibilityReplicationEndpoint.java   
@Override
public WALEntryFilter getWALEntryfilter() {
  return delegator.getWALEntryfilter();
}
项目:pbase    文件:VisibilityReplicationEndpoint.java   
@Override
public WALEntryFilter getWALEntryfilter() {
  return delegator.getWALEntryfilter();
}
项目:hbase    文件:RecoveredReplicationSourceWALReader.java   
public RecoveredReplicationSourceWALReader(FileSystem fs, Configuration conf,
    PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
    ReplicationSource source) {
  super(fs, conf, logQueue, startPosition, filter, source);
}
项目:hbase    文件:RegionReplicaReplicationEndpoint.java   
@Override
protected WALEntryFilter getScopeWALEntryFilter() {
  // we do not care about scope. We replicate everything.
  return null;
}
项目:hbase    文件:VisibilityReplicationEndpoint.java   
@Override
public WALEntryFilter getWALEntryfilter() {
  return delegator.getWALEntryfilter();
}