@Override public Integer call() throws IOException { SinkPeer sinkPeer = null; try { sinkPeer = replicationSinkMgr.getReplicationSink(); BlockingInterface rrs = sinkPeer.getRegionServer(); ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()])); replicationSinkMgr.reportSinkSuccess(sinkPeer); return ordinal; } catch (IOException ioe) { if (sinkPeer != null) { replicationSinkMgr.reportBadSink(sinkPeer); } throw ioe; } }
@Test public void testReportBadSink() { ServerName serverNameA = mock(ServerName.class); ServerName serverNameB = mock(ServerName.class); when(replicationEndpoint.getRegionServers()) .thenReturn(Lists.newArrayList(serverNameA, serverNameB)); sinkManager.chooseSinks(); // Sanity check assertEquals(1, sinkManager.getNumSinks()); SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); sinkManager.reportBadSink(sinkPeer); // Just reporting a bad sink once shouldn't have an effect assertEquals(1, sinkManager.getNumSinks()); }
@Test public void testReportBadSink() { ServerName serverNameA = mock(ServerName.class); ServerName serverNameB = mock(ServerName.class); when(replicationEndpoint.getRegionServers()) .thenReturn(Lists.newArrayList(serverNameA, serverNameB)); sinkManager.chooseSinks(); // Sanity check assertEquals(1, sinkManager.getSinks().size()); SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); sinkManager.reportBadSink(sinkPeer); // Just reporting a bad sink once shouldn't have an effect assertEquals(1, sinkManager.getSinks().size()); }
@Test public void testReportBadSink() { ServerName serverNameA = mock(ServerName.class); ServerName serverNameB = mock(ServerName.class); when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn( Lists.newArrayList(serverNameA, serverNameB)); sinkManager.chooseSinks(); // Sanity check assertEquals(1, sinkManager.getSinks().size()); SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); sinkManager.reportBadSink(sinkPeer); // Just reporting a bad sink once shouldn't have an effect assertEquals(1, sinkManager.getSinks().size()); }
@Override public Integer call() throws IOException { SinkPeer sinkPeer = null; try { sinkPeer = replicationSinkMgr.getReplicationSink(); BlockingInterface rrs = sinkPeer.getRegionServer(); replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir); replicationSinkMgr.reportSinkSuccess(sinkPeer); return ordinal; } catch (IOException ioe) { if (sinkPeer != null) { replicationSinkMgr.reportBadSink(sinkPeer); } throw ioe; } }
@Test public void testReportBadSink_DownToZeroSinks() { List<ServerName> serverNames = Lists.newArrayList(); for (int i = 0; i < 20; i++) { serverNames.add(mock(ServerName.class)); } when(replicationEndpoint.getRegionServers()) .thenReturn(serverNames); sinkManager.chooseSinks(); // Sanity check List<ServerName> sinkList = sinkManager.getSinksForTesting(); assertEquals(2, sinkList.size()); ServerName serverNameA = sinkList.get(0); ServerName serverNameB = sinkList.get(1); SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class)); for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { sinkManager.reportBadSink(sinkPeerA); sinkManager.reportBadSink(sinkPeerB); } // We've gone down to 0 good sinks, so the replication sinks // should have been refreshed now assertEquals(2, sinkManager.getNumSinks()); }
/** * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not * be replicated to anymore. */ @Test public void testReportBadSink_PastThreshold() { List<ServerName> serverNames = Lists.newArrayList(); for (int i = 0; i < 20; i++) { serverNames.add(mock(ServerName.class)); } when(replicationEndpoint.getRegionServers()) .thenReturn(serverNames); sinkManager.chooseSinks(); // Sanity check assertEquals(2, sinkManager.getSinks().size()); ServerName serverName = sinkManager.getSinks().get(0); SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { sinkManager.reportBadSink(sinkPeer); } // Reporting a bad sink more than the threshold count should remove it // from the list of potential sinks assertEquals(1, sinkManager.getSinks().size()); }
@Test public void testReportBadSink_DownToZeroSinks() { List<ServerName> serverNames = Lists.newArrayList(); for (int i = 0; i < 20; i++) { serverNames.add(mock(ServerName.class)); } when(replicationEndpoint.getRegionServers()) .thenReturn(serverNames); sinkManager.chooseSinks(); // Sanity check List<ServerName> sinkList = sinkManager.getSinks(); assertEquals(2, sinkList.size()); ServerName serverNameA = sinkList.get(0); ServerName serverNameB = sinkList.get(1); SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class)); for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { sinkManager.reportBadSink(sinkPeerA); sinkManager.reportBadSink(sinkPeerB); } // We've gone down to 0 good sinks, so the replication sinks // should have been refreshed now assertEquals(2, sinkManager.getSinks().size()); }
/** * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not * be replicated to anymore. */ @Test public void testReportBadSink_PastThreshold() { List<ServerName> serverNames = Lists.newArrayList(); for (int i = 0; i < 20; i++) { serverNames.add(mock(ServerName.class)); } when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)) .thenReturn(serverNames); sinkManager.chooseSinks(); // Sanity check assertEquals(2, sinkManager.getSinks().size()); ServerName serverName = sinkManager.getSinks().get(0); SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { sinkManager.reportBadSink(sinkPeer); } // Reporting a bad sink more than the threshold count should remove it // from the list of potential sinks assertEquals(1, sinkManager.getSinks().size()); }
@Test public void testReportBadSink_DownToZeroSinks() { List<ServerName> serverNames = Lists.newArrayList(); for (int i = 0; i < 20; i++) { serverNames.add(mock(ServerName.class)); } when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)) .thenReturn(serverNames); sinkManager.chooseSinks(); // Sanity check List<ServerName> sinkList = sinkManager.getSinks(); assertEquals(2, sinkList.size()); ServerName serverNameA = sinkList.get(0); ServerName serverNameB = sinkList.get(1); SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class)); for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { sinkManager.reportBadSink(sinkPeerA); sinkManager.reportBadSink(sinkPeerB); } // We've gone down to 0 good sinks, so the replication sinks // should have been refreshed now assertEquals(2, sinkManager.getSinks().size()); }
/** * Do the shipping logic */ @Override public boolean replicate(ReplicateContext replicateContext) { List<Entry> entries = replicateContext.getEntries(); int sleepMultiplier = 1; while (this.isRunning()) { if (!peersSelected) { connectToPeers(); peersSelected = true; } if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { sleepMultiplier++; } continue; } SinkPeer sinkPeer = null; try { sinkPeer = replicationSinkMgr.getReplicationSink(); BlockingInterface rrs = sinkPeer.getRegionServer(); if (LOG.isTraceEnabled()) { LOG.trace("Replicating " + entries.size() + " entries of total size " + replicateContext.getSize()); } ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()])); // update metrics this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); return true; } catch (IOException ioe) { // Didn't ship anything, but must still age the last time we did this.metrics.refreshAgeOfLastShippedOp(); if (ioe instanceof RemoteException) { ioe = ((RemoteException) ioe).unwrapRemoteException(); LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); if (ioe instanceof TableNotFoundException) { if (sleepForRetries("A table is missing in the peer cluster. " + "Replication cannot proceed without losing data.", sleepMultiplier)) { sleepMultiplier++; } } } else { if (ioe instanceof SocketTimeoutException) { // This exception means we waited for more than 60s and nothing // happened, the cluster is alive and calling it right away // even for a test just makes things worse. sleepForRetries("Encountered a SocketTimeoutException. Since the " + "call to the remote cluster timed out, which is usually " + "caused by a machine failure or a massive slowdown", this.socketTimeoutMultiplier); } else if (ioe instanceof ConnectException) { LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe); replicationSinkMgr.chooseSinks(); } else { LOG.warn("Can't replicate because of a local or network error: ", ioe); } } if (sinkPeer != null) { replicationSinkMgr.reportBadSink(sinkPeer); } if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { sleepMultiplier++; } } } return false; // in case we exited before replicating }