@Override public WALEntryFilter getWALEntryfilter() { WALEntryFilter superFilter = super.getWALEntryfilter(); WALEntryFilter skipReplayedEditsFilter = getSkipReplayedEditsFilter(); if (superFilter == null) { return skipReplayedEditsFilter; } if (skipReplayedEditsFilter == null) { return superFilter; } ArrayList<WALEntryFilter> filters = Lists.newArrayList(); filters.add(superFilter); filters.add(skipReplayedEditsFilter); return new ChainWALEntryFilter(filters); }
private void initializeWALEntryFilter() { // get the WALEntryFilter from ReplicationEndpoint and add it to default filters ArrayList<WALEntryFilter> filters = Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter()); WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); if (filterFromEndpoint != null) { filters.add(filterFromEndpoint); } filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); this.walEntryFilter = new ChainWALEntryFilter(filters); }
/** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * entries, and puts them on a batch queue. * @param fs the files system to use * @param conf configuration to use * @param logQueue The WAL queue to read off of * @param startPosition position in the first WAL to start reading from * @param filter The filter to use while reading * @param source replication source */ public ReplicationSourceWALReader(FileSystem fs, Configuration conf, PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter, ReplicationSource source) { this.logQueue = logQueue; this.currentPosition = startPosition; this.fs = fs; this.conf = conf; this.filter = filter; this.source = source; this.replicationBatchSizeCapacity = this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64); this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); // memory used will be batchSizeCapacity * (nb.batches + 1) // the +1 is for the current thread reading before placing onto the queue int batchCount = conf.getInt("replication.source.nb.batches", 1); this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed(); this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : " + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity + ", replicationBatchQueueCapacity=" + batchCount); }
private WALEntryFilter getDummyFilter() { return new WALEntryFilter() { @Override public Entry filter(Entry entry) { return entry; } }; }
@Override public void run() { // mark we are running now this.sourceRunning = true; try { // start the endpoint, connect to the cluster Service.State state = replicationEndpoint.start().get(); if (state != Service.State.RUNNING) { LOG.warn("ReplicationEndpoint was not started. Exiting"); uninitialize(); return; } } catch (Exception ex) { LOG.warn("Error starting ReplicationEndpoint, exiting", ex); throw new RuntimeException(ex); } // get the WALEntryFilter from ReplicationEndpoint and add it to default filters ArrayList<WALEntryFilter> filters = Lists.newArrayList( (WALEntryFilter)new SystemTableWALEntryFilter()); WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); if (filterFromEndpoint != null) { filters.add(filterFromEndpoint); } this.walEntryFilter = new ChainWALEntryFilter(filters); int sleepMultiplier = 1; // delay this until we are in an asynchronous thread while (this.isSourceActive() && this.peerClusterId == null) { this.peerClusterId = replicationEndpoint.getPeerUUID(); if (this.isSourceActive() && this.peerClusterId == null) { if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { sleepMultiplier++; } } } // In rare case, zookeeper setting may be messed up. That leads to the incorrect // peerClusterId value, which is the same as the source clusterId if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) { this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " + peerClusterId + " which is not allowed by ReplicationEndpoint:" + replicationEndpoint.getClass().getName(), null, false); } LOG.info("Replicating " + clusterId + " -> " + peerClusterId); // start workers for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) { String walGroupId = entry.getKey(); PriorityBlockingQueue<Path> queue = entry.getValue(); final ReplicationSourceWorkerThread worker = new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this); ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); } else { LOG.debug("Starting up worker for wal group " + walGroupId); worker.startup(); } } }
protected WALEntryFilter getSkipReplayedEditsFilter() { return new SkipReplayedEditsFilter(); }
@Override protected WALEntryFilter getScopeWALEntryFilter() { // we do not care about scope. We replicate everything. return null; }
@Override public WALEntryFilter getWALEntryfilter() { return delegator.getWALEntryfilter(); }
public RecoveredReplicationSourceWALReader(FileSystem fs, Configuration conf, PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter, ReplicationSource source) { super(fs, conf, logQueue, startPosition, filter, source); }