private void initializeWALEntryFilter() { // get the WALEntryFilter from ReplicationEndpoint and add it to default filters ArrayList<WALEntryFilter> filters = Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter()); WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); if (filterFromEndpoint != null) { filters.add(filterFromEndpoint); } filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); this.walEntryFilter = new ChainWALEntryFilter(filters); }
@Override public void run() { // mark we are running now this.sourceRunning = true; try { // start the endpoint, connect to the cluster Service.State state = replicationEndpoint.start().get(); if (state != Service.State.RUNNING) { LOG.warn("ReplicationEndpoint was not started. Exiting"); uninitialize(); return; } } catch (Exception ex) { LOG.warn("Error starting ReplicationEndpoint, exiting", ex); throw new RuntimeException(ex); } // get the WALEntryFilter from ReplicationEndpoint and add it to default filters ArrayList<WALEntryFilter> filters = Lists.newArrayList( (WALEntryFilter)new SystemTableWALEntryFilter()); WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); if (filterFromEndpoint != null) { filters.add(filterFromEndpoint); } this.walEntryFilter = new ChainWALEntryFilter(filters); int sleepMultiplier = 1; // delay this until we are in an asynchronous thread while (this.isSourceActive() && this.peerClusterId == null) { this.peerClusterId = replicationEndpoint.getPeerUUID(); if (this.isSourceActive() && this.peerClusterId == null) { if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { sleepMultiplier++; } } } // In rare case, zookeeper setting may be messed up. That leads to the incorrect // peerClusterId value, which is the same as the source clusterId if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) { this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " + peerClusterId + " which is not allowed by ReplicationEndpoint:" + replicationEndpoint.getClass().getName(), null, false); } LOG.info("Replicating " + clusterId + " -> " + peerClusterId); // start workers for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) { String walGroupId = entry.getKey(); PriorityBlockingQueue<Path> queue = entry.getValue(); final ReplicationSourceWorkerThread worker = new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this); ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); } else { LOG.debug("Starting up worker for wal group " + walGroupId); worker.startup(); } } }