/** * Waits until there is only one log(the current writing one) in the replication queue * @param numRs number of regionservers */ private void waitForLogAdvance(int numRs) throws Exception { Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { for (int i = 0; i < numRs; i++) { HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i); RegionInfo regionInfo = utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); WAL wal = hrs.getWAL(regionInfo); Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName(); Replication replicationService = (Replication) utility1.getHBaseCluster() .getRegionServer(i).getReplicationSourceService(); for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() .getSources()) { ReplicationSource source = (ReplicationSource) rsi; if (!currentFile.equals(source.getCurrentPath())) { return false; } } } return true; } }); }