/** * Replicate WAL entries on the region server. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority=HConstants.REPLICATION_QOS) public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { try { checkOpen(); if (regionServer.replicationSinkHandler != null) { requestCount.increment(); List<WALEntry> entries = request.getEntryList(); CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner(); regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner); regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner); regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner); return ReplicateWALEntryResponse.newBuilder().build(); } else { throw new ServiceException("Replication services are not initialized yet"); } } catch (IOException ie) { throw new ServiceException(ie); } }
/** * Replicate WAL entries on the region server. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority = HConstants.REPLICATION_QOS) public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { try { if (regionServer.replicationSinkHandler != null) { checkOpen(); requestCount.increment(); List<WALEntry> entries = request.getEntryList(); CellScanner cellScanner = ((PayloadCarryingRpcController) controller).cellScanner(); regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner); regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner); regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner); } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } }
/** * Replicate WAL entries on the region server. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority=HConstants.REPLICATION_QOS) public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { try { if (replicationSinkHandler != null) { checkOpen(); requestCount.increment(); this.replicationSinkHandler.replicateLogEntries(request.getEntryList(), ((PayloadCarryingRpcController)controller).cellScanner()); } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } }
/** * Replicate WAL entries on the region server. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority=HConstants.REPLICATION_QOS) public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { try { if (regionServer.replicationSinkHandler != null) { checkOpen(); requestCount.increment(); regionServer.replicationSinkHandler.replicateLogEntries(request.getEntryList(), ((PayloadCarryingRpcController)controller).cellScanner()); } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } }
/** * Replicate WAL entries on the region server. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority=HConstants.REPLICATION_QOS) public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { try { if (replicationSinkHandler != null) { checkOpen(); requestCount.increment(); HLog.Entry[] entries = ProtobufUtil.toHLogEntries(request.getEntryList()); if (entries != null && entries.length > 0) { replicationSinkHandler.replicateLogEntries(entries); } } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } }
@Override public ReplicateWALEntryResponse replay(RpcController controller, ReplicateWALEntryRequest request) throws ServiceException { // TODO Auto-generated method stub return null; }
/** * A helper to replicate a list of HLog entries using admin protocol. * * @param admin * @param entries * @throws IOException */ public static void replicateWALEntry(final AdminProtocol admin, final HLog.Entry[] entries) throws IOException { ReplicateWALEntryRequest request = RequestConverter.buildReplicateWALEntryRequest(entries); try { admin.replicateWALEntry(null, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
@Override public ReplicateWALEntryResponse replicateWALEntry(RpcController controller, ReplicateWALEntryRequest request) throws ServiceException { // TODO Auto-generated method stub return null; }
/** * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is * that the given mutations will be durable on the receiving RS if this method returns without any * exception. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority = HConstants.REPLAY_QOS) public ReplicateWALEntryResponse replay(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTime(); CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); try { checkOpen(); List<WALEntry> entries = request.getEntryList(); if (entries == null || entries.isEmpty()) { // empty input return ReplicateWALEntryResponse.newBuilder().build(); } HRegion region = regionServer.getRegionByEncodedName( entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>(); for (WALEntry entry : entries) { if (regionServer.nonceManager != null) { long nonceGroup = entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime()); } Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<WALKey, WALEdit>(); List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry, cells, walEntry); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), walEntry.getSecond())) { // if bypass this log entry, ignore it ... continue; } walEntries.add(walEntry); } if (edits != null && !edits.isEmpty()) { long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); // check if it's a partial success for (int i = 0; result != null && i < result.length; i++) { if (result[i] != OperationStatus.SUCCESS) { throw new IOException(result[i].getExceptionMsg()); } } } } //sync wal at the end because ASYNC_WAL is used above region.syncWal(); if (coprocessorHost != null) { for (Pair<WALKey, WALEdit> wal : walEntries) { coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(), wal.getSecond()); } } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } finally { if (regionServer.metricsRegionServer != null) { regionServer.metricsRegionServer.updateReplay( EnvironmentEdgeManager.currentTime() - before); } } }
/** * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is * that the given mutations will be durable on the receiving RS if this method returns without any * exception. * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority = HConstants.REPLAY_QOS) public ReplicateWALEntryResponse replay(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTimeMillis(); CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); try { checkOpen(); List<WALEntry> entries = request.getEntryList(); if (entries == null || entries.isEmpty()) { // empty input return ReplicateWALEntryResponse.newBuilder().build(); } HRegion region = this.getRegionByEncodedName( entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>(); List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>(); // when tag is enabled, we need tag replay edits with log sequence number boolean needAddReplayTag = (HFile.getFormatVersion(this.conf) >= 3); for (WALEntry entry : entries) { if (nonceManager != null) { long nonceGroup = entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime()); } Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<HLogKey, WALEdit>(); List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry, cells, walEntry, needAddReplayTag); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), walEntry.getSecond())) { // if bypass this log entry, ignore it ... continue; } walEntries.add(walEntry); } mutations.addAll(edits); } if (!mutations.isEmpty()) { OperationStatus[] result = doReplayBatchOp(region, mutations); // check if it's a partial success for (int i = 0; result != null && i < result.length; i++) { if (result[i] != OperationStatus.SUCCESS) { throw new IOException(result[i].getExceptionMsg()); } } } if (coprocessorHost != null) { for (Pair<HLogKey, WALEdit> wal : walEntries) { coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(), wal.getSecond()); } } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } finally { metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTimeMillis() - before); } }
/** * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is * that the given mutations will be durable on the receiving RS if this method returns without any * exception. * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority = HConstants.REPLAY_QOS) public ReplicateWALEntryResponse replay(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTimeMillis(); CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); try { checkOpen(); List<WALEntry> entries = request.getEntryList(); if (entries == null || entries.isEmpty()) { // empty input return ReplicateWALEntryResponse.newBuilder().build(); } HRegion region = regionServer.getRegionByEncodedName( entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>(); List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>(); // when tag is enabled, we need tag replay edits with log sequence number boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3); for (WALEntry entry : entries) { if (regionServer.nonceManager != null) { long nonceGroup = entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime()); } Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<HLogKey, WALEdit>(); List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry, cells, walEntry, needAddReplayTag); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), walEntry.getSecond())) { // if bypass this log entry, ignore it ... continue; } walEntries.add(walEntry); } mutations.addAll(edits); } if (!mutations.isEmpty()) { OperationStatus[] result = doReplayBatchOp(region, mutations); // check if it's a partial success for (int i = 0; result != null && i < result.length; i++) { if (result[i] != OperationStatus.SUCCESS) { throw new IOException(result[i].getExceptionMsg()); } } } if (coprocessorHost != null) { for (Pair<HLogKey, WALEdit> wal : walEntries) { coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(), wal.getSecond()); } } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } finally { if (regionServer.metricsRegionServer != null) { regionServer.metricsRegionServer.updateReplay( EnvironmentEdgeManager.currentTimeMillis() - before); } } }
/** * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is * that the given mutations will be durable on the receiving RS if this method returns without any * exception. * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority = HConstants.REPLAY_QOS) public ReplicateWALEntryResponse replay(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTimeMillis(); CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); try { checkOpen(); List<WALEntry> entries = request.getEntryList(); if (entries == null || entries.isEmpty()) { // empty input return ReplicateWALEntryResponse.newBuilder().build(); } HRegion region = this.getRegionByEncodedName( entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>(); List<Pair<MutationType, Mutation>> mutations = new ArrayList<Pair<MutationType, Mutation>>(); for (WALEntry entry : entries) { Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<HLogKey, WALEdit>(); List<Pair<MutationType, Mutation>> edits = HLogSplitter.getMutationsFromWALEntry(entry, cells, walEntry); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), walEntry.getSecond())) { // if bypass this log entry, ignore it ... continue; } walEntries.add(walEntry); } mutations.addAll(edits); } if (!mutations.isEmpty()) { OperationStatus[] result = doBatchOp(region, mutations, true); // check if it's a partial success for (int i = 0; result != null && i < result.length; i++) { if (result[i] != OperationStatus.SUCCESS) { throw new IOException(result[i].getExceptionMsg()); } } } if (coprocessorHost != null) { for (Pair<HLogKey, WALEdit> wal : walEntries) { coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(), wal.getSecond()); } } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } finally { metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTimeMillis() - before); } }
/** * Create a new ReplicateWALEntryRequest from a list of HLog entries * * @param entries the HLog entries to be replicated * @return a ReplicateWALEntryRequest */ public static ReplicateWALEntryRequest buildReplicateWALEntryRequest(final HLog.Entry[] entries) { FamilyScope.Builder scopeBuilder = FamilyScope.newBuilder(); WALEntry.Builder entryBuilder = WALEntry.newBuilder(); ReplicateWALEntryRequest.Builder builder = ReplicateWALEntryRequest.newBuilder(); for (HLog.Entry entry: entries) { entryBuilder.clear(); WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder(); HLogKey key = entry.getKey(); keyBuilder.setEncodedRegionName( ByteString.copyFrom(key.getEncodedRegionName())); keyBuilder.setTableName(ByteString.copyFrom(key.getTablename())); keyBuilder.setLogSequenceNumber(key.getLogSeqNum()); keyBuilder.setWriteTime(key.getWriteTime()); UUID clusterId = key.getClusterId(); if (clusterId != null) { AdminProtos.UUID.Builder uuidBuilder = keyBuilder.getClusterIdBuilder(); uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits()); uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits()); } WALEdit edit = entry.getEdit(); WALEntry.WALEdit.Builder editBuilder = entryBuilder.getEditBuilder(); NavigableMap<byte[], Integer> scopes = edit.getScopes(); if (scopes != null && !scopes.isEmpty()) { for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) { scopeBuilder.setFamily(ByteString.copyFrom(scope.getKey())); ScopeType scopeType = ScopeType.valueOf(scope.getValue().intValue()); scopeBuilder.setScopeType(scopeType); editBuilder.addFamilyScope(scopeBuilder.build()); } } List<KeyValue> keyValues = edit.getKeyValues(); for (KeyValue value: keyValues) { editBuilder.addKeyValueBytes(ByteString.copyFrom( value.getBuffer(), value.getOffset(), value.getLength())); } builder.addEntry(entryBuilder.build()); } return builder.build(); }