@Override public Integer call() throws IOException { SinkPeer sinkPeer = null; try { sinkPeer = replicationSinkMgr.getReplicationSink(); BlockingInterface rrs = sinkPeer.getRegionServer(); ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()])); replicationSinkMgr.reportSinkSuccess(sinkPeer); return ordinal; } catch (IOException ioe) { if (sinkPeer != null) { replicationSinkMgr.reportBadSink(sinkPeer); } throw ioe; } }
@Override public synchronized Void call() throws IOException { errors.progress(); try { BlockingInterface server = connection.getAdmin(rsinfo); // list all online regions from this region server List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(server); regions = filterRegions(regions); if (details) { errors.detail("RegionServer: " + rsinfo.getServerName() + " number of regions: " + regions.size()); for (HRegionInfo rinfo: regions) { errors.detail(" " + rinfo.getRegionNameAsString() + " id: " + rinfo.getRegionId() + " encoded_name: " + rinfo.getEncodedName() + " start: " + Bytes.toStringBinary(rinfo.getStartKey()) + " end: " + Bytes.toStringBinary(rinfo.getEndKey())); } } // check to see if the existence of this region matches the region in META for (HRegionInfo r:regions) { HbckInfo hbi = hbck.getOrCreateInfo(r.getEncodedName()); hbi.addServer(r, rsinfo); } } catch (IOException e) { // unable to connect to the region server. errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "RegionServer: " + rsinfo.getServerName() + " Unable to fetch region information. " + e); throw e; } return null; }
@Override public synchronized Void call() throws IOException { errors.progress(); try { BlockingInterface server = connection.getAdmin(rsinfo); // list all online regions from this region server List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(server); regions = filterRegions(regions); if (details) { errors.detail("RegionServer: " + rsinfo.getServerName() + " number of regions: " + regions.size()); for (HRegionInfo rinfo: regions) { errors.detail(" " + rinfo.getRegionNameAsString() + " id: " + rinfo.getRegionId() + " encoded_name: " + rinfo.getEncodedName() + " start: " + Bytes.toStringBinary(rinfo.getStartKey()) + " end: " + Bytes.toStringBinary(rinfo.getEndKey())); } } // check to see if the existence of this region matches the region in META for (HRegionInfo r:regions) { HbckInfo hbi = hbck.getOrCreateInfo(r.getEncodedName()); if (!RegionReplicaUtil.isDefaultReplica(r)) hbi.setSkipChecks(true); hbi.addServer(r, rsinfo); } } catch (IOException e) { // unable to connect to the region server. errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "RegionServer: " + rsinfo.getServerName() + " Unable to fetch region information. " + e); throw e; } return null; }
/** * Do the shipping logic */ @Override public boolean replicate(ReplicateContext replicateContext) { List<Entry> entries = replicateContext.getEntries(); int sleepMultiplier = 1; while (this.isRunning()) { if (!peersSelected) { connectToPeers(); peersSelected = true; } if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { sleepMultiplier++; } continue; } SinkPeer sinkPeer = null; try { sinkPeer = replicationSinkMgr.getReplicationSink(); BlockingInterface rrs = sinkPeer.getRegionServer(); if (LOG.isTraceEnabled()) { LOG.trace("Replicating " + entries.size() + " entries of total size " + replicateContext.getSize()); } ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()])); // update metrics this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); return true; } catch (IOException ioe) { // Didn't ship anything, but must still age the last time we did this.metrics.refreshAgeOfLastShippedOp(); if (ioe instanceof RemoteException) { ioe = ((RemoteException) ioe).unwrapRemoteException(); LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); if (ioe instanceof TableNotFoundException) { if (sleepForRetries("A table is missing in the peer cluster. " + "Replication cannot proceed without losing data.", sleepMultiplier)) { sleepMultiplier++; } } } else { if (ioe instanceof SocketTimeoutException) { // This exception means we waited for more than 60s and nothing // happened, the cluster is alive and calling it right away // even for a test just makes things worse. sleepForRetries("Encountered a SocketTimeoutException. Since the " + "call to the remote cluster timed out, which is usually " + "caused by a machine failure or a massive slowdown", this.socketTimeoutMultiplier); } else if (ioe instanceof ConnectException) { LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe); replicationSinkMgr.chooseSinks(); } else { LOG.warn("Can't replicate because of a local or network error: ", ioe); } } if (sinkPeer != null) { replicationSinkMgr.reportBadSink(sinkPeer); } if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { sleepMultiplier++; } } } return false; // in case we exited before replicating }