private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo, final List<Entry> entries) throws IOException { try { RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null); ReplayServerCallable<ReplicateWALEntryResponse> callable = new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc, regionInfo, entries); factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout); } catch (IOException ie) { if (skipErrors) { LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + "=true so continuing replayEdits with error:" + ie.getMessage()); } else { throw ie; } } }
/** * Replicate WAL entries on the region server. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority=HConstants.REPLICATION_QOS) public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { try { checkOpen(); if (regionServer.replicationSinkHandler != null) { requestCount.increment(); List<WALEntry> entries = request.getEntryList(); CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner(); regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner); regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner); regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner); return ReplicateWALEntryResponse.newBuilder().build(); } else { throw new ServiceException("Replication services are not initialized yet"); } } catch (IOException ie) { throw new ServiceException(ie); } }
private void replicateUsingCallable(ClusterConnection connection, Queue<Entry> entries) throws IOException, RuntimeException { Entry entry; while ((entry = entries.poll()) != null) { byte[] row = entry.getEdit().getCells().get(0).getRow(); RegionLocations locations = connection.locateRegion(tableName, row, true, true); RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection, RpcControllerFactory.instantiate(connection.getConfiguration()), table.getName(), locations.getRegionLocation(1), locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry), new AtomicLong()); RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate( connection.getConfiguration()); factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000); } }
/** * Replicate WAL entries on the region server. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority = HConstants.REPLICATION_QOS) public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { try { if (regionServer.replicationSinkHandler != null) { checkOpen(); requestCount.increment(); List<WALEntry> entries = request.getEntryList(); CellScanner cellScanner = ((PayloadCarryingRpcController) controller).cellScanner(); regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner); regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner); regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner); } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } }
/** * Replicate WAL entries on the region server. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority=HConstants.REPLICATION_QOS) public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { try { if (replicationSinkHandler != null) { checkOpen(); requestCount.increment(); this.replicationSinkHandler.replicateLogEntries(request.getEntryList(), ((PayloadCarryingRpcController)controller).cellScanner()); } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } }
private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo, final List<HLog.Entry> entries) throws IOException { try { RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf); ReplayServerCallable<ReplicateWALEntryResponse> callable = new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc, regionInfo, entries); factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout); } catch (IOException ie) { if (skipErrors) { LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + "=true so continuing replayEdits with error:" + ie.getMessage()); } else { throw ie; } } }
/** * Replicate WAL entries on the region server. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority=HConstants.REPLICATION_QOS) public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { try { if (regionServer.replicationSinkHandler != null) { checkOpen(); requestCount.increment(); regionServer.replicationSinkHandler.replicateLogEntries(request.getEntryList(), ((PayloadCarryingRpcController)controller).cellScanner()); } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } }
/** * Replicate WAL entries on the region server. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority=HConstants.REPLICATION_QOS) public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { try { if (replicationSinkHandler != null) { checkOpen(); requestCount.increment(); HLog.Entry[] entries = ProtobufUtil.toHLogEntries(request.getEntryList()); if (entries != null && entries.length > 0) { replicationSinkHandler.replicateLogEntries(entries); } } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } }
@Override public ReplicateWALEntryResponse call(int callTimeout) throws IOException { try { replayToServer(this.regionInfo, this.entries); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } return null; }
@Override public ReplicateWALEntryResponse replay(RpcController controller, ReplicateWALEntryRequest request) throws ServiceException { // TODO Auto-generated method stub return null; }
@Override public ReplicateWALEntryResponse call() throws IOException { try { replayToServer(this.regionInfo, this.entries); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } return null; }
@Override public ReplicateWALEntryResponse call(int timeout) throws IOException { return replayToServer(this.entries, timeout); }
private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout) throws IOException { // check whether we should still replay this entry. If the regions are changed, or the // entry is not coming form the primary region, filter it out because we do not need it. // Regions can change because of (1) region split (2) region merge (3) table recreated boolean skip = false; if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(), initialEncodedRegionName)) { skip = true; } if (!entries.isEmpty() && !skip) { Entry[] entriesArray = new Entry[entries.size()]; entriesArray = entries.toArray(entriesArray); // set the region name for the target region replica Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest( entriesArray, location.getRegionInfo().getEncodedNameAsBytes()); try { PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); controller.setCallTimeout(timeout); controller.setPriority(tableName); return stub.replay(controller, p.getFirst()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } } if (skip) { if (LOG.isTraceEnabled()) { LOG.trace("Skipping " + entries.size() + " entries in table " + tableName + " because located region " + location.getRegionInfo().getEncodedName() + " is different than the original region " + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit"); for (Entry entry : entries) { LOG.trace("Skipping : " + entry); } } skippedEntries.addAndGet(entries.size()); } return ReplicateWALEntryResponse.newBuilder().build(); }
@Override public ReplicateWALEntryResponse replicateWALEntry(RpcController controller, ReplicateWALEntryRequest request) throws ServiceException { // TODO Auto-generated method stub return null; }
/** * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is * that the given mutations will be durable on the receiving RS if this method returns without any * exception. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority = HConstants.REPLAY_QOS) public ReplicateWALEntryResponse replay(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTime(); CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); try { checkOpen(); List<WALEntry> entries = request.getEntryList(); if (entries == null || entries.isEmpty()) { // empty input return ReplicateWALEntryResponse.newBuilder().build(); } HRegion region = regionServer.getRegionByEncodedName( entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>(); for (WALEntry entry : entries) { if (regionServer.nonceManager != null) { long nonceGroup = entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime()); } Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<WALKey, WALEdit>(); List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry, cells, walEntry); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), walEntry.getSecond())) { // if bypass this log entry, ignore it ... continue; } walEntries.add(walEntry); } if (edits != null && !edits.isEmpty()) { long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); // check if it's a partial success for (int i = 0; result != null && i < result.length; i++) { if (result[i] != OperationStatus.SUCCESS) { throw new IOException(result[i].getExceptionMsg()); } } } } //sync wal at the end because ASYNC_WAL is used above region.syncWal(); if (coprocessorHost != null) { for (Pair<WALKey, WALEdit> wal : walEntries) { coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(), wal.getSecond()); } } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } finally { if (regionServer.metricsRegionServer != null) { regionServer.metricsRegionServer.updateReplay( EnvironmentEdgeManager.currentTime() - before); } } }
/** * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is * that the given mutations will be durable on the receiving RS if this method returns without any * exception. * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority = HConstants.REPLAY_QOS) public ReplicateWALEntryResponse replay(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTimeMillis(); CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); try { checkOpen(); List<WALEntry> entries = request.getEntryList(); if (entries == null || entries.isEmpty()) { // empty input return ReplicateWALEntryResponse.newBuilder().build(); } HRegion region = this.getRegionByEncodedName( entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>(); List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>(); // when tag is enabled, we need tag replay edits with log sequence number boolean needAddReplayTag = (HFile.getFormatVersion(this.conf) >= 3); for (WALEntry entry : entries) { if (nonceManager != null) { long nonceGroup = entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime()); } Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<HLogKey, WALEdit>(); List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry, cells, walEntry, needAddReplayTag); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), walEntry.getSecond())) { // if bypass this log entry, ignore it ... continue; } walEntries.add(walEntry); } mutations.addAll(edits); } if (!mutations.isEmpty()) { OperationStatus[] result = doReplayBatchOp(region, mutations); // check if it's a partial success for (int i = 0; result != null && i < result.length; i++) { if (result[i] != OperationStatus.SUCCESS) { throw new IOException(result[i].getExceptionMsg()); } } } if (coprocessorHost != null) { for (Pair<HLogKey, WALEdit> wal : walEntries) { coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(), wal.getSecond()); } } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } finally { metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTimeMillis() - before); } }
/** * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is * that the given mutations will be durable on the receiving RS if this method returns without any * exception. * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority = HConstants.REPLAY_QOS) public ReplicateWALEntryResponse replay(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTimeMillis(); CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); try { checkOpen(); List<WALEntry> entries = request.getEntryList(); if (entries == null || entries.isEmpty()) { // empty input return ReplicateWALEntryResponse.newBuilder().build(); } HRegion region = regionServer.getRegionByEncodedName( entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>(); List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>(); // when tag is enabled, we need tag replay edits with log sequence number boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3); for (WALEntry entry : entries) { if (regionServer.nonceManager != null) { long nonceGroup = entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime()); } Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<HLogKey, WALEdit>(); List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry, cells, walEntry, needAddReplayTag); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), walEntry.getSecond())) { // if bypass this log entry, ignore it ... continue; } walEntries.add(walEntry); } mutations.addAll(edits); } if (!mutations.isEmpty()) { OperationStatus[] result = doReplayBatchOp(region, mutations); // check if it's a partial success for (int i = 0; result != null && i < result.length; i++) { if (result[i] != OperationStatus.SUCCESS) { throw new IOException(result[i].getExceptionMsg()); } } } if (coprocessorHost != null) { for (Pair<HLogKey, WALEdit> wal : walEntries) { coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(), wal.getSecond()); } } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } finally { if (regionServer.metricsRegionServer != null) { regionServer.metricsRegionServer.updateReplay( EnvironmentEdgeManager.currentTimeMillis() - before); } } }
/** * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is * that the given mutations will be durable on the receiving RS if this method returns without any * exception. * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority = HConstants.REPLAY_QOS) public ReplicateWALEntryResponse replay(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTimeMillis(); CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); try { checkOpen(); List<WALEntry> entries = request.getEntryList(); if (entries == null || entries.isEmpty()) { // empty input return ReplicateWALEntryResponse.newBuilder().build(); } HRegion region = this.getRegionByEncodedName( entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>(); List<Pair<MutationType, Mutation>> mutations = new ArrayList<Pair<MutationType, Mutation>>(); for (WALEntry entry : entries) { Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<HLogKey, WALEdit>(); List<Pair<MutationType, Mutation>> edits = HLogSplitter.getMutationsFromWALEntry(entry, cells, walEntry); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), walEntry.getSecond())) { // if bypass this log entry, ignore it ... continue; } walEntries.add(walEntry); } mutations.addAll(edits); } if (!mutations.isEmpty()) { OperationStatus[] result = doBatchOp(region, mutations, true); // check if it's a partial success for (int i = 0; result != null && i < result.length; i++) { if (result[i] != OperationStatus.SUCCESS) { throw new IOException(result[i].getExceptionMsg()); } } } if (coprocessorHost != null) { for (Pair<HLogKey, WALEdit> wal : walEntries) { coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(), wal.getSecond()); } } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } finally { metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTimeMillis() - before); } }