Java 类org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse 实例源码

项目:ditb    文件:WALEditsReplaySink.java   
private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
    final List<Entry> entries) throws IOException {
  try {
    RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null);
    ReplayServerCallable<ReplicateWALEntryResponse> callable =
        new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
            regionInfo, entries);
    factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
  } catch (IOException ie) {
    if (skipErrors) {
      LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
          + "=true so continuing replayEdits with error:" + ie.getMessage());
    } else {
      throw ie;
    }
  }
}
项目:ditb    文件:RSRpcServices.java   
/**
 * Replicate WAL entries on the region server.
 *
 * @param controller the RPC controller
 * @param request the request
 * @throws ServiceException
 */
@Override
@QosPriority(priority=HConstants.REPLICATION_QOS)
public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
    final ReplicateWALEntryRequest request) throws ServiceException {
  try {
    checkOpen();
    if (regionServer.replicationSinkHandler != null) {
      requestCount.increment();
      List<WALEntry> entries = request.getEntryList();
      CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
      regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
      regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
      regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
      return ReplicateWALEntryResponse.newBuilder().build();
    } else {
      throw new ServiceException("Replication services are not initialized yet");
    }
  } catch (IOException ie) {
    throw new ServiceException(ie);
  }
}
项目:ditb    文件:TestRegionReplicaReplicationEndpointNoMaster.java   
private void replicateUsingCallable(ClusterConnection connection, Queue<Entry> entries)
    throws IOException, RuntimeException {
  Entry entry;
  while ((entry = entries.poll()) != null) {
    byte[] row = entry.getEdit().getCells().get(0).getRow();
    RegionLocations locations = connection.locateRegion(tableName, row, true, true);
    RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
      RpcControllerFactory.instantiate(connection.getConfiguration()),
      table.getName(), locations.getRegionLocation(1),
      locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry),
      new AtomicLong());

    RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(
      connection.getConfiguration());
    factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000);
  }
}
项目:pbase    文件:WALEditsReplaySink.java   
private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
    final List<Entry> entries) throws IOException {
  try {
    RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null);
    ReplayServerCallable<ReplicateWALEntryResponse> callable =
        new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
            regionInfo, entries);
    factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
  } catch (IOException ie) {
    if (skipErrors) {
      LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
          + "=true so continuing replayEdits with error:" + ie.getMessage());
    } else {
      throw ie;
    }
  }
}
项目:pbase    文件:RSRpcServices.java   
/**
 * Replicate WAL entries on the region server.
 *
 * @param controller the RPC controller
 * @param request    the request
 * @throws ServiceException
 */
@Override
@QosPriority(priority = HConstants.REPLICATION_QOS)
public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
                                                   final ReplicateWALEntryRequest request) throws ServiceException {
    try {
        if (regionServer.replicationSinkHandler != null) {
            checkOpen();
            requestCount.increment();
            List<WALEntry> entries = request.getEntryList();
            CellScanner cellScanner = ((PayloadCarryingRpcController) controller).cellScanner();
            regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
            regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
            regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
        }
        return ReplicateWALEntryResponse.newBuilder().build();
    } catch (IOException ie) {
        throw new ServiceException(ie);
    }
}
项目:HIndex    文件:HRegionServer.java   
/**
 * Replicate WAL entries on the region server.
 *
 * @param controller the RPC controller
 * @param request the request
 * @throws ServiceException
 */
@Override
@QosPriority(priority=HConstants.REPLICATION_QOS)
public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
    final ReplicateWALEntryRequest request)
throws ServiceException {
  try {
    if (replicationSinkHandler != null) {
      checkOpen();
      requestCount.increment();
      this.replicationSinkHandler.replicateLogEntries(request.getEntryList(),
        ((PayloadCarryingRpcController)controller).cellScanner());
    }
    return ReplicateWALEntryResponse.newBuilder().build();
  } catch (IOException ie) {
    throw new ServiceException(ie);
  }
}
项目:HIndex    文件:WALEditsReplaySink.java   
private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
    final List<HLog.Entry> entries) throws IOException {
  try {
    RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf);
    ReplayServerCallable<ReplicateWALEntryResponse> callable =
        new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
            regionInfo, entries);
    factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
  } catch (IOException ie) {
    if (skipErrors) {
      LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
          + "=true so continuing replayEdits with error:" + ie.getMessage());
    } else {
      throw ie;
    }
  }
}
项目:PyroDB    文件:WALEditsReplaySink.java   
private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
    final List<HLog.Entry> entries) throws IOException {
  try {
    RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf);
    ReplayServerCallable<ReplicateWALEntryResponse> callable =
        new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
            regionInfo, entries);
    factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
  } catch (IOException ie) {
    if (skipErrors) {
      LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
          + "=true so continuing replayEdits with error:" + ie.getMessage());
    } else {
      throw ie;
    }
  }
}
项目:PyroDB    文件:RSRpcServices.java   
/**
 * Replicate WAL entries on the region server.
 *
 * @param controller the RPC controller
 * @param request the request
 * @throws ServiceException
 */
@Override
@QosPriority(priority=HConstants.REPLICATION_QOS)
public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
    final ReplicateWALEntryRequest request) throws ServiceException {
  try {
    if (regionServer.replicationSinkHandler != null) {
      checkOpen();
      requestCount.increment();
      regionServer.replicationSinkHandler.replicateLogEntries(request.getEntryList(),
        ((PayloadCarryingRpcController)controller).cellScanner());
    }
    return ReplicateWALEntryResponse.newBuilder().build();
  } catch (IOException ie) {
    throw new ServiceException(ie);
  }
}
项目:c5    文件:HRegionServer.java   
/**
 * Replicate WAL entries on the region server.
 *
 * @param controller the RPC controller
 * @param request the request
 * @throws ServiceException
 */
@Override
@QosPriority(priority=HConstants.REPLICATION_QOS)
public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
    final ReplicateWALEntryRequest request)
throws ServiceException {
  try {
    if (replicationSinkHandler != null) {
      checkOpen();
      requestCount.increment();
      this.replicationSinkHandler.replicateLogEntries(request.getEntryList(),
        ((PayloadCarryingRpcController)controller).cellScanner());
    }
    return ReplicateWALEntryResponse.newBuilder().build();
  } catch (IOException ie) {
    throw new ServiceException(ie);
  }
}
项目:c5    文件:WALEditsReplaySink.java   
private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
    final List<HLog.Entry> entries) throws IOException {
  try {
    RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf);
    ReplayServerCallable<ReplicateWALEntryResponse> callable =
        new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
            regionInfo, entries);
    factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
  } catch (IOException ie) {
    if (skipErrors) {
      LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
          + "=true so continuing replayEdits with error:" + ie.getMessage());
    } else {
      throw ie;
    }
  }
}
项目:DominoHBase    文件:HRegionServer.java   
/**
 * Replicate WAL entries on the region server.
 *
 * @param controller the RPC controller
 * @param request the request
 * @throws ServiceException
 */
@Override
@QosPriority(priority=HConstants.REPLICATION_QOS)
public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
    final ReplicateWALEntryRequest request) throws ServiceException {
  try {
    if (replicationSinkHandler != null) {
      checkOpen();
      requestCount.increment();
      HLog.Entry[] entries = ProtobufUtil.toHLogEntries(request.getEntryList());
      if (entries != null && entries.length > 0) {
        replicationSinkHandler.replicateLogEntries(entries);
      }
    }
    return ReplicateWALEntryResponse.newBuilder().build();
  } catch (IOException ie) {
    throw new ServiceException(ie);
  }
}
项目:ditb    文件:WALEditsReplaySink.java   
@Override
public ReplicateWALEntryResponse call(int callTimeout) throws IOException {
  try {
    replayToServer(this.regionInfo, this.entries);
  } catch (ServiceException se) {
    throw ProtobufUtil.getRemoteException(se);
  }
  return null;
}
项目:ditb    文件:MockRegionServer.java   
@Override
public ReplicateWALEntryResponse
    replay(RpcController controller, ReplicateWALEntryRequest request)
    throws ServiceException {
  // TODO Auto-generated method stub
  return null;
}
项目:pbase    文件:WALEditsReplaySink.java   
@Override
public ReplicateWALEntryResponse call(int callTimeout) throws IOException {
  try {
    replayToServer(this.regionInfo, this.entries);
  } catch (ServiceException se) {
    throw ProtobufUtil.getRemoteException(se);
  }
  return null;
}
项目:pbase    文件:MockRegionServer.java   
@Override
public ReplicateWALEntryResponse
    replay(RpcController controller, ReplicateWALEntryRequest request)
    throws ServiceException {
  // TODO Auto-generated method stub
  return null;
}
项目:HIndex    文件:WALEditsReplaySink.java   
@Override
public ReplicateWALEntryResponse call() throws IOException {
  try {
    replayToServer(this.regionInfo, this.entries);
  } catch (ServiceException se) {
    throw ProtobufUtil.getRemoteException(se);
  }
  return null;
}
项目:HIndex    文件:MockRegionServer.java   
@Override
public ReplicateWALEntryResponse
    replay(RpcController controller, ReplicateWALEntryRequest request)
    throws ServiceException {
  // TODO Auto-generated method stub
  return null;
}
项目:PyroDB    文件:WALEditsReplaySink.java   
@Override
public ReplicateWALEntryResponse call(int callTimeout) throws IOException {
  try {
    replayToServer(this.regionInfo, this.entries);
  } catch (ServiceException se) {
    throw ProtobufUtil.getRemoteException(se);
  }
  return null;
}
项目:PyroDB    文件:MockRegionServer.java   
@Override
public ReplicateWALEntryResponse
    replay(RpcController controller, ReplicateWALEntryRequest request)
    throws ServiceException {
  // TODO Auto-generated method stub
  return null;
}
项目:c5    文件:WALEditsReplaySink.java   
@Override
public ReplicateWALEntryResponse call() throws IOException {
  try {
    replayToServer(this.regionInfo, this.entries);
  } catch (ServiceException se) {
    throw ProtobufUtil.getRemoteException(se);
  }
  return null;
}
项目:c5    文件:MockRegionServer.java   
@Override
public ReplicateWALEntryResponse
    replay(RpcController controller, ReplicateWALEntryRequest request)
    throws ServiceException {
  // TODO Auto-generated method stub
  return null;
}
项目:ditb    文件:RegionReplicaReplicationEndpoint.java   
@Override
public ReplicateWALEntryResponse call(int timeout) throws IOException {
  return replayToServer(this.entries, timeout);
}
项目:ditb    文件:RegionReplicaReplicationEndpoint.java   
private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
    throws IOException {
  // check whether we should still replay this entry. If the regions are changed, or the
  // entry is not coming form the primary region, filter it out because we do not need it.
  // Regions can change because of (1) region split (2) region merge (3) table recreated
  boolean skip = false;

  if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
    initialEncodedRegionName)) {
    skip = true;
  }
  if (!entries.isEmpty() && !skip) {
    Entry[] entriesArray = new Entry[entries.size()];
    entriesArray = entries.toArray(entriesArray);

    // set the region name for the target region replica
    Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
        ReplicationProtbufUtil.buildReplicateWALEntryRequest(
          entriesArray, location.getRegionInfo().getEncodedNameAsBytes());
    try {
      PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
      controller.setCallTimeout(timeout);
      controller.setPriority(tableName);
      return stub.replay(controller, p.getFirst());
    } catch (ServiceException se) {
      throw ProtobufUtil.getRemoteException(se);
    }
  }

  if (skip) {
    if (LOG.isTraceEnabled()) {
      LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
        + " because located region " + location.getRegionInfo().getEncodedName()
        + " is different than the original region "
        + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
      for (Entry entry : entries) {
        LOG.trace("Skipping : " + entry);
      }
    }
    skippedEntries.addAndGet(entries.size());
  }
  return ReplicateWALEntryResponse.newBuilder().build();
}
项目:ditb    文件:MockRegionServer.java   
@Override
public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
    ReplicateWALEntryRequest request) throws ServiceException {
  // TODO Auto-generated method stub
  return null;
}
项目:pbase    文件:RSRpcServices.java   
/**
 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
 * that the given mutations will be durable on the receiving RS if this method returns without any
 * exception.
 *
 * @param controller the RPC controller
 * @param request    the request
 * @throws ServiceException
 */
@Override
@QosPriority(priority = HConstants.REPLAY_QOS)
public ReplicateWALEntryResponse replay(final RpcController controller,
                                        final ReplicateWALEntryRequest request) throws ServiceException {
    long before = EnvironmentEdgeManager.currentTime();
    CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
    try {
        checkOpen();
        List<WALEntry> entries = request.getEntryList();
        if (entries == null || entries.isEmpty()) {
            // empty input
            return ReplicateWALEntryResponse.newBuilder().build();
        }
        HRegion region = regionServer.getRegionByEncodedName(
                entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
        RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
        List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
        for (WALEntry entry : entries) {
            if (regionServer.nonceManager != null) {
                long nonceGroup = entry.getKey().hasNonceGroup()
                        ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
                long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
                regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
            }
            Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
                    new Pair<WALKey, WALEdit>();
            List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
                    cells, walEntry);
            if (coprocessorHost != null) {
                // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
                // KeyValue.
                if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
                        walEntry.getSecond())) {
                    // if bypass this log entry, ignore it ...
                    continue;
                }
                walEntries.add(walEntry);
            }
            if (edits != null && !edits.isEmpty()) {
                long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
                        entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
                OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
                // check if it's a partial success
                for (int i = 0; result != null && i < result.length; i++) {
                    if (result[i] != OperationStatus.SUCCESS) {
                        throw new IOException(result[i].getExceptionMsg());
                    }
                }
            }
        }

        //sync wal at the end because ASYNC_WAL is used above
        region.syncWal();

        if (coprocessorHost != null) {
            for (Pair<WALKey, WALEdit> wal : walEntries) {
                coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
                        wal.getSecond());
            }
        }
        return ReplicateWALEntryResponse.newBuilder().build();
    } catch (IOException ie) {
        throw new ServiceException(ie);
    } finally {
        if (regionServer.metricsRegionServer != null) {
            regionServer.metricsRegionServer.updateReplay(
                    EnvironmentEdgeManager.currentTime() - before);
        }
    }
}
项目:pbase    文件:MockRegionServer.java   
@Override
public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
    ReplicateWALEntryRequest request) throws ServiceException {
  // TODO Auto-generated method stub
  return null;
}
项目:HIndex    文件:HRegionServer.java   
/**
 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
 * that the given mutations will be durable on the receiving RS if this method returns without any
 * exception.
 * @param controller the RPC controller
 * @param request the request
 * @throws ServiceException
 */
@Override
@QosPriority(priority = HConstants.REPLAY_QOS)
public ReplicateWALEntryResponse replay(final RpcController controller,
    final ReplicateWALEntryRequest request) throws ServiceException {
  long before = EnvironmentEdgeManager.currentTimeMillis();
  CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
  try {
    checkOpen();
    List<WALEntry> entries = request.getEntryList();
    if (entries == null || entries.isEmpty()) {
      // empty input
      return ReplicateWALEntryResponse.newBuilder().build();
    }
    HRegion region = this.getRegionByEncodedName(
      entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
    RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
    List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
    List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>();
    // when tag is enabled, we need tag replay edits with log sequence number
    boolean needAddReplayTag = (HFile.getFormatVersion(this.conf) >= 3);
    for (WALEntry entry : entries) {
      if (nonceManager != null) {
        long nonceGroup = entry.getKey().hasNonceGroup()
            ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
        long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
        nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
      }
      Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
        new Pair<HLogKey, WALEdit>();
      List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry,
        cells, walEntry, needAddReplayTag);
      if (coprocessorHost != null) {
        // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
        // KeyValue.
        if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
          walEntry.getSecond())) {
          // if bypass this log entry, ignore it ...
          continue;
        }
        walEntries.add(walEntry);
      }
      mutations.addAll(edits);
    }

    if (!mutations.isEmpty()) {
      OperationStatus[] result = doReplayBatchOp(region, mutations);
      // check if it's a partial success
      for (int i = 0; result != null && i < result.length; i++) {
        if (result[i] != OperationStatus.SUCCESS) {
          throw new IOException(result[i].getExceptionMsg());
        }
      }
    }
    if (coprocessorHost != null) {
      for (Pair<HLogKey, WALEdit> wal : walEntries) {
        coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
          wal.getSecond());
      }
    }
    return ReplicateWALEntryResponse.newBuilder().build();
  } catch (IOException ie) {
    throw new ServiceException(ie);
  } finally {
    metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTimeMillis() - before);
  }
}
项目:HIndex    文件:MockRegionServer.java   
@Override
public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
    ReplicateWALEntryRequest request) throws ServiceException {
  // TODO Auto-generated method stub
  return null;
}
项目:PyroDB    文件:RSRpcServices.java   
/**
 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
 * that the given mutations will be durable on the receiving RS if this method returns without any
 * exception.
 * @param controller the RPC controller
 * @param request the request
 * @throws ServiceException
 */
@Override
@QosPriority(priority = HConstants.REPLAY_QOS)
public ReplicateWALEntryResponse replay(final RpcController controller,
    final ReplicateWALEntryRequest request) throws ServiceException {
  long before = EnvironmentEdgeManager.currentTimeMillis();
  CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
  try {
    checkOpen();
    List<WALEntry> entries = request.getEntryList();
    if (entries == null || entries.isEmpty()) {
      // empty input
      return ReplicateWALEntryResponse.newBuilder().build();
    }
    HRegion region = regionServer.getRegionByEncodedName(
      entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
    RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
    List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
    List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>();
    // when tag is enabled, we need tag replay edits with log sequence number
    boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3);
    for (WALEntry entry : entries) {
      if (regionServer.nonceManager != null) {
        long nonceGroup = entry.getKey().hasNonceGroup()
          ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
        long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
        regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
      }
      Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
        new Pair<HLogKey, WALEdit>();
      List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry,
        cells, walEntry, needAddReplayTag);
      if (coprocessorHost != null) {
        // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
        // KeyValue.
        if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
          walEntry.getSecond())) {
          // if bypass this log entry, ignore it ...
          continue;
        }
        walEntries.add(walEntry);
      }
      mutations.addAll(edits);
    }

    if (!mutations.isEmpty()) {
      OperationStatus[] result = doReplayBatchOp(region, mutations);
      // check if it's a partial success
      for (int i = 0; result != null && i < result.length; i++) {
        if (result[i] != OperationStatus.SUCCESS) {
          throw new IOException(result[i].getExceptionMsg());
        }
      }
    }
    if (coprocessorHost != null) {
      for (Pair<HLogKey, WALEdit> wal : walEntries) {
        coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
          wal.getSecond());
      }
    }
    return ReplicateWALEntryResponse.newBuilder().build();
  } catch (IOException ie) {
    throw new ServiceException(ie);
  } finally {
    if (regionServer.metricsRegionServer != null) {
      regionServer.metricsRegionServer.updateReplay(
        EnvironmentEdgeManager.currentTimeMillis() - before);
    }
  }
}
项目:PyroDB    文件:MockRegionServer.java   
@Override
public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
    ReplicateWALEntryRequest request) throws ServiceException {
  // TODO Auto-generated method stub
  return null;
}
项目:c5    文件:HRegionServer.java   
/**
 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
 * that the given mutations will be durable on the receiving RS if this method returns without any
 * exception.
 * @param controller the RPC controller
 * @param request the request
 * @throws ServiceException
 */
@Override
@QosPriority(priority = HConstants.REPLAY_QOS)
public ReplicateWALEntryResponse replay(final RpcController controller,
    final ReplicateWALEntryRequest request) throws ServiceException {
  long before = EnvironmentEdgeManager.currentTimeMillis();
  CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
  try {
    checkOpen();
    List<WALEntry> entries = request.getEntryList();
    if (entries == null || entries.isEmpty()) {
      // empty input
      return ReplicateWALEntryResponse.newBuilder().build();
    }
    HRegion region = this.getRegionByEncodedName(
      entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
    RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
    List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
    List<Pair<MutationType, Mutation>> mutations = new ArrayList<Pair<MutationType, Mutation>>();
    for (WALEntry entry : entries) {
      Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
        new Pair<HLogKey, WALEdit>();
      List<Pair<MutationType, Mutation>> edits = HLogSplitter.getMutationsFromWALEntry(entry,
        cells, walEntry);
      if (coprocessorHost != null) {
        // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
        // KeyValue.
        if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
          walEntry.getSecond())) {
          // if bypass this log entry, ignore it ...
          continue;
        }
        walEntries.add(walEntry);
      }
      mutations.addAll(edits);
    }

    if (!mutations.isEmpty()) {
      OperationStatus[] result = doBatchOp(region, mutations, true);
      // check if it's a partial success
      for (int i = 0; result != null && i < result.length; i++) {
        if (result[i] != OperationStatus.SUCCESS) {
          throw new IOException(result[i].getExceptionMsg());
        }
      }
    }
    if (coprocessorHost != null) {
      for (Pair<HLogKey, WALEdit> wal : walEntries) {
        coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
          wal.getSecond());
      }
    }
    return ReplicateWALEntryResponse.newBuilder().build();
  } catch (IOException ie) {
    throw new ServiceException(ie);
  } finally {
    metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTimeMillis() - before);
  }
}
项目:c5    文件:MockRegionServer.java   
@Override
public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
    ReplicateWALEntryRequest request) throws ServiceException {
  // TODO Auto-generated method stub
  return null;
}
项目:DominoHBase    文件:MockRegionServer.java   
@Override
public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
    ReplicateWALEntryRequest request) throws ServiceException {
  // TODO Auto-generated method stub
  return null;
}