/** * Adds a normal source per registered peer cluster and tries to process all * old region server wal queues */ protected void init() throws IOException, ReplicationException { for (String id : this.replicationPeers.getPeerIds()) { addSource(id); } List<String> currentReplicators = this.replicationQueues.getListOfReplicators(); if (currentReplicators == null || currentReplicators.size() == 0) { return; } List<String> otherRegionServers = replicationTracker.getListOfRegionServers(); LOG.info("Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); // Look if there's anything to process after a restart for (String rs : currentReplicators) { if (!otherRegionServers.contains(rs)) { transferQueues(rs); } } }
/** * Create replication peer for replicating to region replicas if needed. * @param conf configuration to use * @throws IOException */ public static void setupRegionReplicaReplication(Configuration conf) throws IOException { if (!isRegionReplicaReplicationEnabled(conf)) { return; } ReplicationAdmin repAdmin = new ReplicationAdmin(conf); try { if (repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER) == null) { ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf)); peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()); repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null); } } catch (ReplicationException ex) { throw new IOException(ex); } finally { repAdmin.close(); } }
@VisibleForTesting List<ReplicationPeer> listReplicationPeers() { Map<String, ReplicationPeerConfig> peers = listPeerConfigs(); if (peers == null || peers.size() <= 0) { return null; } List<ReplicationPeer> listOfPeers = new ArrayList<ReplicationPeer>(peers.size()); for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) { String peerId = peerEntry.getKey(); try { Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId); Configuration peerConf = pair.getSecond(); ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(), parseTableCFsFromConfig(this.getPeerTableCFs(peerId))); listOfPeers.add(peer); } catch (ReplicationException e) { LOG.warn("Failed to get valid replication peers. " + "Error connecting to peer cluster with peerId=" + peerId + ". Error message=" + e.getMessage()); LOG.debug("Failure details to get valid replication peers.", e); continue; } } return listOfPeers; }
void preLogRoll(Path newLog) throws IOException { synchronized (this.walsById) { String name = newLog.getName(); for (ReplicationSourceInterface source : this.sources) { try { this.replicationQueues.addLog(source.getPeerClusterZnode(), name); } catch (ReplicationException e) { throw new IOException("Cannot add log to replication queue with id=" + source.getPeerClusterZnode() + ", filename=" + name, e); } } for (SortedSet<String> wals : this.walsById.values()) { if (this.sources.isEmpty()) { // If there's no slaves, don't need to keep the old wals since // we only consider the last one when a new slave comes in wals.clear(); } wals.add(name); } } this.latestPath = newLog; }
/** * Constructor that creates a connection to the local ZooKeeper ensemble. * @param conf Configuration to use * @throws IOException if an internal replication error occurs * @throws RuntimeException if replication isn't enabled. */ public ReplicationAdmin(Configuration conf) throws IOException { if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT)) { throw new RuntimeException("hbase.replication isn't true, please " + "enable it in order to use replication"); } this.connection = ConnectionFactory.createConnection(conf); zkw = createZooKeeperWatcher(); try { this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection); this.replicationPeers.init(); this.replicationQueuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection); this.replicationQueuesClient.init(); } catch (ReplicationException e) { throw new IOException("Error initializing the replication admin client.", e); } }
/** * Adds a normal source per registered peer cluster and tries to process all * old region server hlog queues */ protected void init() throws IOException, ReplicationException { for (String id : this.replicationPeers.getConnectedPeers()) { addSource(id); } List<String> currentReplicators = this.replicationQueues.getListOfReplicators(); if (currentReplicators == null || currentReplicators.size() == 0) { return; } List<String> otherRegionServers = replicationTracker.getListOfRegionServers(); LOG.info("Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); // Look if there's anything to process after a restart for (String rs : currentReplicators) { if (!otherRegionServers.contains(rs)) { transferQueues(rs); } } }
void preLogRoll(Path newLog) throws IOException { synchronized (this.hlogsById) { String name = newLog.getName(); for (ReplicationSourceInterface source : this.sources) { try { this.replicationQueues.addLog(source.getPeerClusterZnode(), name); } catch (ReplicationException e) { throw new IOException("Cannot add log to replication queue with id=" + source.getPeerClusterZnode() + ", filename=" + name, e); } } for (SortedSet<String> hlogs : this.hlogsById.values()) { if (this.sources.isEmpty()) { // If there's no slaves, don't need to keep the old hlogs since // we only consider the last one when a new slave comes in hlogs.clear(); } hlogs.add(name); } } this.latestPath = newLog; }
/** * Constructor that creates a connection to the local ZooKeeper ensemble. * @param conf Configuration to use * @throws IOException if an internal replication error occurs * @throws RuntimeException if replication isn't enabled. */ public ReplicationAdmin(Configuration conf) throws IOException { if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT)) { throw new RuntimeException("hbase.replication isn't true, please " + "enable it in order to use replication"); } this.connection = HConnectionManager.getConnection(conf); ZooKeeperWatcher zkw = createZooKeeperWatcher(); try { this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection); this.replicationPeers.init(); this.replicationQueuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection); this.replicationQueuesClient.init(); } catch (ReplicationException e) { throw new IOException("Error initializing the replication admin client.", e); } }
@Override public boolean isFileDeletable(FileStatus fStat) { Set<String> hfileRefsFromQueue; // all members of this class are null if replication is disabled, // so do not stop from deleting the file if (getConf() == null) { return true; } try { hfileRefsFromQueue = rqs.getAllHFileRefs(); } catch (ReplicationException e) { LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable " + "file for " + fStat.getPath()); return false; } return !hfileRefsFromQueue.contains(fStat.getPath().getName()); }
private void adoptAbandonedQueues() { List<ServerName> currentReplicators = null; try { currentReplicators = queueStorage.getListOfReplicators(); } catch (ReplicationException e) { server.abort("Failed to get all replicators", e); return; } if (currentReplicators == null || currentReplicators.isEmpty()) { return; } List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream() .map(ServerName::valueOf).collect(Collectors.toList()); LOG.info( "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); // Look if there's anything to process after a restart for (ServerName rs : currentReplicators) { if (!otherRegionServers.contains(rs)) { transferQueues(rs); } } }
@Override public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) throws ReplicationException { Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs(); if (tableCFMap != null) { List<String> tableCfs = tableCFMap.get(tableName); if (tableCFMap.containsKey(tableName) && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { this.queueStorage.addHFileRefs(peerId, pairs); metrics.incrSizeOfHFileRefsQueue(pairs.size()); } else { LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family " + Bytes.toString(family) + " to peer id " + peerId); } } else { // user has explicitly not defined any table cfs for replication, means replicate all the // data this.queueStorage.addHFileRefs(peerId, pairs); metrics.incrSizeOfHFileRefsQueue(pairs.size()); } }
private void refreshPeerState(String peerId) throws ReplicationException, IOException { PeerState newState; Lock peerLock = peersLock.acquireLock(peerId); try { ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); if (peer == null) { throw new ReplicationException("Peer with id=" + peerId + " is not cached."); } PeerState oldState = peer.getPeerState(); newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); // RS need to start work with the new replication state change if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) { replicationSourceManager.refreshSources(peerId); } } finally { peerLock.unlock(); } }
@Override public void updatePeerConfig(String peerId) throws ReplicationException, IOException { Lock peerLock = peersLock.acquireLock(peerId); try { ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); if (peer == null) { throw new ReplicationException("Peer with id=" + peerId + " is not cached."); } ReplicationPeerConfig oldConfig = peer.getPeerConfig(); ReplicationPeerConfig newConfig = replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId); // RS need to start work with the new replication config change if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) { replicationSourceManager.refreshSources(peerId); } } finally { peerLock.unlock(); } }
private void checkQueuesDeleted(String peerId) throws ReplicationException, DoNotRetryIOException { for (ServerName replicator : queueStorage.getListOfReplicators()) { List<String> queueIds = queueStorage.getAllQueues(replicator); for (String queueId : queueIds) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); if (queueInfo.getPeerId().equals(peerId)) { throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId + ", replicator: " + replicator + ", queueId: " + queueId); } } } if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) { throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs"); } }
public void checkUnDeletedQueues() throws ReplicationException { undeletedQueueIds = getUnDeletedQueues(); undeletedQueueIds.forEach((replicator, queueIds) -> { queueIds.forEach(queueId -> { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); String msg = "Undeleted replication queue for removed peer found: " + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(), replicator, queueId); errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg); }); }); undeletedHFileRefsPeerIds = getUndeletedHFileRefsPeers(); undeletedHFileRefsPeerIds.stream() .map( peerId -> "Undeleted replication hfile-refs queue for removed peer " + peerId + " found") .forEach(msg -> errorReporter .reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg)); }
@Test public void testIsFileDeletable() throws IOException, ReplicationException { // 1. Create a file Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs"); fs.createNewFile(file); // 2. Assert file is successfully created assertTrue("Test file not created!", fs.exists(file)); ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); cleaner.setConf(conf); // 3. Assert that file as is should be deletable assertTrue("Cleaner should allow to delete this file as there is no hfile reference node " + "for it in the queue.", cleaner.isFileDeletable(fs.getFileStatus(file))); List<Pair<Path, Path>> files = new ArrayList<>(1); files.add(new Pair<>(null, file)); // 4. Add the file to hfile-refs queue rq.addHFileRefs(peerId, files); // 5. Assert file should not be deletable assertFalse("Cleaner should not allow to delete this file as there is a hfile reference node " + "for it in the queue.", cleaner.isFileDeletable(fs.getFileStatus(file))); }
@Override public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, Map<TableName, List<String>> tableCfs) { if (tableCfs == null) { return failedFuture(new ReplicationException("tableCfs is null")); } CompletableFuture<Void> future = new CompletableFuture<Void>(); getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> { if (!completeExceptionally(future, error)) { ReplicationPeerConfig newPeerConfig = ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> { if (!completeExceptionally(future, error)) { future.complete(result); } }); } }); return future; }
/** * Add sources for the given peer cluster on this region server. For the newly added peer, we only * need to enqueue the latest log of each wal group and do replication * @param id the id of the peer cluster * @return the source that was created * @throws IOException */ protected ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer); synchronized (this.walsById) { this.sources.add(src); Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>(); this.walsById.put(id, walsByGroup); // Add the latest wal to that source's queue synchronized (latestPaths) { if (this.latestPaths.size() > 0) { for (Path logPath : latestPaths) { String name = logPath.getName(); String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name); SortedSet<String> logs = new TreeSet<String>(); logs.add(name); walsByGroup.put(walPrefix, logs); try { this.replicationQueues.addLog(id, name); } catch (ReplicationException e) { String message = "Cannot add log to queue when creating a new source, queueId=" + id + ", filename=" + name; server.stop(message); throw e; } src.enqueueLog(logPath); } } } } src.startup(); return src; }
/** * If replication is enabled and this cluster is a master, * it starts * @throws IOException */ public void startReplicationService() throws IOException { if (this.replication) { try { this.replicationManager.init(); } catch (ReplicationException e) { throw new IOException(e); } this.replicationSink = new ReplicationSink(this.conf, this.server); this.scheduleThreadPool.scheduleAtFixedRate( new ReplicationStatisticsThread(this.replicationSink, this.replicationManager), statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); } }
@Test public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException { // create a table with region replicas. Check whether the replication peer is created // and replication started. ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); String peerId = "region_replica_replication"; if (admin.getPeerConfig(peerId) != null) { admin.removePeer(peerId); } HTableDescriptor htd = HTU.createTableDescriptor( "testReplicationPeerIsCreated_no_region_replicas"); HTU.getHBaseAdmin().createTable(htd); ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId); assertNull(peerConfig); htd = HTU.createTableDescriptor("testReplicationPeerIsCreated"); htd.setRegionReplication(2); HTU.getHBaseAdmin().createTable(htd); // assert peer configuration is correct peerConfig = admin.getPeerConfig(peerId); assertNotNull(peerConfig); assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( HTU.getConfiguration())); assertEquals(peerConfig.getReplicationEndpointImpl(), RegionReplicaReplicationEndpoint.class.getName()); admin.close(); }
/** * Append the replicable table-cf config of the specified peer * @param id a short that identifies the cluster * @param tableCfs A map from tableName to column family names */ public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException { if (tableCfs == null) { throw new ReplicationException("tableCfs is null"); } Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id)); if (preTableCfs == null) { setPeerTableCFs(id, tableCfs); return; } for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { TableName table = entry.getKey(); Collection<String> appendCfs = entry.getValue(); if (preTableCfs.containsKey(table)) { List<String> cfs = preTableCfs.get(table); if (cfs == null || appendCfs == null) { preTableCfs.put(table, null); } else { Set<String> cfSet = new HashSet<String>(cfs); cfSet.addAll(appendCfs); preTableCfs.put(table, Lists.newArrayList(cfSet)); } } else { if (appendCfs == null || appendCfs.isEmpty()) { preTableCfs.put(table, null); } else { preTableCfs.put(table, Lists.newArrayList(appendCfs)); } } } setPeerTableCFs(id, preTableCfs); }
/** * Remove some table-cfs from config of the specified peer * @param id a short name that identifies the cluster * @param tableCfs A map from tableName to column family names * @throws ReplicationException */ public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException { if (tableCfs == null) { throw new ReplicationException("tableCfs is null"); } Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id)); if (preTableCfs == null) { throw new ReplicationException("Table-Cfs for peer" + id + " is null"); } for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) { TableName table = entry.getKey(); Collection<String> removeCfs = entry.getValue(); if (preTableCfs.containsKey(table)) { List<String> cfs = preTableCfs.get(table); if (cfs == null && removeCfs == null) { preTableCfs.remove(table); } else if (cfs != null && removeCfs != null) { Set<String> cfSet = new HashSet<String>(cfs); cfSet.removeAll(removeCfs); if (cfSet.isEmpty()) { preTableCfs.remove(table); } else { preTableCfs.put(table, Lists.newArrayList(cfSet)); } } else if (cfs == null && removeCfs != null) { throw new ReplicationException("Cannot remove cf of table: " + table + " which doesn't specify cfs from table-cfs config in peer: " + id); } else if (cfs != null && removeCfs == null) { throw new ReplicationException("Cannot remove table: " + table + " which has specified cfs from table-cfs config in peer: " + id); } } else { throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id); } } setPeerTableCFs(id, preTableCfs); }
/** * * @param configuration * @param peerName * @param tableCFs * @throws ReplicationException * @throws IOException */ protected void addPeer(final Configuration configuration,String peerName, Map<TableName, List<String>> tableCFs) throws ReplicationException, IOException { try (ReplicationAdmin replicationAdmin = new ReplicationAdmin(configuration)) { ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() .setClusterKey(ZKConfig.getZooKeeperClusterKey(configuration)) .setReplicationEndpointImpl(HbaseEndpoint.class.getName()); replicationAdmin.addPeer(peerName, peerConfig, tableCFs); } }
/** * Add a new normal source to this region server * @param id the id of the peer cluster * @return the source that was created * @throws IOException */ protected ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer); synchronized (this.walsById) { this.sources.add(src); this.walsById.put(id, new TreeSet<String>()); // Add the latest wal to that source's queue if (this.latestPath != null) { String name = this.latestPath.getName(); this.walsById.get(id).add(name); try { this.replicationQueues.addLog(src.getPeerClusterZnode(), name); } catch (ReplicationException e) { String message = "Cannot add log to queue when creating a new source, queueId=" + src.getPeerClusterZnode() + ", filename=" + name; server.stop(message); throw e; } src.enqueueLog(this.latestPath); } } src.startup(); return src; }
private static String getPeerQuorumAddress(final Configuration conf) throws IOException { ZooKeeperWatcher localZKW = null; ReplicationPeerZKImpl peer = null; try { localZKW = new ZooKeeperWatcher(conf, "VerifyReplication", new Abortable() { @Override public void abort(String why, Throwable e) {} @Override public boolean isAborted() {return false;} }); ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); rp.init(); Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId); if (pair == null) { throw new IOException("Couldn't get peer conf!"); } Configuration peerConf = rp.getPeerConf(peerId).getSecond(); return ZKUtil.getZooKeeperClusterKey(peerConf); } catch (ReplicationException e) { throw new IOException( "An error occured while trying to connect to the remove peer cluster", e); } finally { if (peer != null) { peer.close(); } if (localZKW != null) { localZKW.close(); } } }
/** * Append the replicable table-cf config of the specified peer * @param id a short that identifies the cluster * @param tableCfs A map from tableName to column family names * @throws KeeperException */ public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException { if (tableCfs == null) { throw new ReplicationException("tableCfs is null"); } Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id)); if (preTableCfs == null) { setPeerTableCFs(id, tableCfs); return; } for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { TableName table = entry.getKey(); Collection<String> appendCfs = entry.getValue(); if (preTableCfs.containsKey(table)) { List<String> cfs = preTableCfs.get(table); if (cfs == null || appendCfs == null) { preTableCfs.put(table, null); } else { Set<String> cfSet = new HashSet<String>(cfs); cfSet.addAll(appendCfs); preTableCfs.put(table, Lists.newArrayList(cfSet)); } } else { if (appendCfs == null || appendCfs.isEmpty()) { preTableCfs.put(table, null); } else { preTableCfs.put(table, Lists.newArrayList(appendCfs)); } } } setPeerTableCFs(id, preTableCfs); }
/** * Add a new normal source to this region server * @param id the id of the peer cluster * @return the source that was created * @throws IOException */ protected ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, this.replicationQueues, this.replicationPeers, stopper, id, this.clusterId); synchronized (this.hlogsById) { this.sources.add(src); this.hlogsById.put(id, new TreeSet<String>()); // Add the latest hlog to that source's queue if (this.latestPath != null) { String name = this.latestPath.getName(); this.hlogsById.get(id).add(name); try { this.replicationQueues.addLog(src.getPeerClusterZnode(), name); } catch (ReplicationException e) { String message = "Cannot add log to queue when creating a new source, queueId=" + src.getPeerClusterZnode() + ", filename=" + name; stopper.stop(message); throw e; } src.enqueueLog(this.latestPath); } } src.startup(); return src; }
private static String getPeerQuorumAddress(final Configuration conf) throws IOException { ZooKeeperWatcher localZKW = null; ReplicationPeer peer = null; try { localZKW = new ZooKeeperWatcher(conf, "VerifyReplication", new Abortable() { @Override public void abort(String why, Throwable e) {} @Override public boolean isAborted() {return false;} }); ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); rp.init(); Configuration peerConf = rp.getPeerConf(peerId); if (peerConf == null) { throw new IOException("Couldn't get peer conf!"); } return ZKUtil.getZooKeeperClusterKey(peerConf); } catch (ReplicationException e) { throw new IOException( "An error occured while trying to connect to the remove peer cluster", e); } finally { if (peer != null) { peer.close(); } if (localZKW != null) { localZKW.close(); } } }
public void copyTableCFs() throws ReplicationException { for (String peerId : peerStorage.listPeerIds()) { if (!copyTableCFs(peerId)) { LOG.error("upgrade tableCFs failed for peerId=" + peerId); } } }
@Override public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { // all members of this class are null if replication is disabled, // so we cannot filter the files if (this.getConf() == null) { return files; } final Set<String> hfileRefs; try { // The concurrently created new hfile entries in ZK may not be included in the return list, // but they won't be deleted because they're not in the checking set. hfileRefs = rqs.getAllHFileRefs(); } catch (ReplicationException e) { LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files"); return Collections.emptyList(); } return Iterables.filter(files, new Predicate<FileStatus>() { @Override public boolean apply(FileStatus file) { String hfile = file.getPath().getName(); boolean foundHFileRefInQueue = hfileRefs.contains(hfile); if (LOG.isDebugEnabled()) { if (foundHFileRefInQueue) { LOG.debug("Found hfile reference in ZK, keeping: " + hfile); } else { LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile); } } return !foundHFileRefInQueue; } }); }