@Override public void mutateRows(RpcController controller, MutateRowsRequest request, RpcCallback<MutateRowsResponse> done) { long start = System.nanoTime(); super.mutateRows(controller, request, done); endpointExecution.updateNanos(System.nanoTime() - start); }
/** * Performs an atomic multi-mutate operation against the given table. */ // Used by the RSGroup Coprocessor Endpoint. It had a copy/paste of the below. Need to reveal // this facility for CPEP use or at least those CPEPs that are on their way to becoming part of // core as is the intent for RSGroup eventually. public static void multiMutate(Connection connection, final Table table, byte[] row, final List<Mutation> mutations) throws IOException { debugLogMutations(mutations); // TODO: Need rollback!!!! // TODO: Need Retry!!! // TODO: What for a timeout? Default write timeout? GET FROM HTABLE? // TODO: Review when we come through with ProcedureV2. RegionServerCallable<MutateRowsResponse, MultiRowMutationProtos.MultiRowMutationService.BlockingInterface> callable = new RegionServerCallable<MutateRowsResponse, MultiRowMutationProtos.MultiRowMutationService.BlockingInterface>( connection, table.getName(), row, null/*RpcController not used in this CPEP!*/) { @Override protected MutateRowsResponse rpcCall() throws Exception { final MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder(); for (Mutation mutation : mutations) { if (mutation instanceof Put) { builder.addMutationRequest(ProtobufUtil.toMutation( ClientProtos.MutationProto.MutationType.PUT, mutation)); } else if (mutation instanceof Delete) { builder.addMutationRequest(ProtobufUtil.toMutation( ClientProtos.MutationProto.MutationType.DELETE, mutation)); } else { throw new DoNotRetryIOException("multi in MetaEditor doesn't support " + mutation.getClass().getName()); } } // The call to #prepare that ran before this invocation will have populated HRegionLocation. HRegionLocation hrl = getLocation(); RegionSpecifier region = ProtobufUtil.buildRegionSpecifier( RegionSpecifierType.REGION_NAME, hrl.getRegionInfo().getRegionName()); builder.setRegion(region); // The rpcController here is awkward. The Coprocessor Endpoint wants an instance of a // com.google.protobuf but we are going over an rpc that is all shaded protobuf so it // wants a org.apache.h.h.shaded.com.google.protobuf.RpcController. Set up a factory // that makes com.google.protobuf.RpcController and then copy into it configs. return getStub().mutateRows(null, builder.build()); } @Override // Called on the end of the super.prepare call. Set the stub. protected void setStubByServiceName(ServerName serviceName/*Ignored*/) throws IOException { CoprocessorRpcChannel channel = table.coprocessorService(getRow()); setStub(MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel)); } }; int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); // The region location should be cached in connection. Call prepare so this callable picks // up the region location (see super.prepare method). callable.prepare(false); callable.call(writeTimeout); }