/** * Randomly pick a connection and process the batch of actions for a given table * @param actions the actions * @param tableName table name * @param results the results array * @param callback * @throws IOException */ @SuppressWarnings("deprecation") public <R> void processBatchCallback(List<? extends Row> actions, TableName tableName, Object[] results, Batch.Callback<R> callback) throws IOException { // Currently used by RegionStateStore // A deprecated method is used as multiple threads accessing RegionStateStore do a single put // and htable is not thread safe. Alternative would be to create an Htable instance for each // put but that is not very efficient. // See HBASE-11610 for more details. try { hConnections[ThreadLocalRandom.current().nextInt(noOfConnections)].processBatchCallback( actions, tableName, this.batchPool, results, callback); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } }
private static Map<byte [], Long> sum(final Table table, final byte [] family, final byte [] qualifier, final byte [] start, final byte [] end) throws ServiceException, Throwable { return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class, start, end, new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() { @Override public Long call(ColumnAggregationProtos.ColumnAggregationService instance) throws IOException { BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback = new BlockingRpcCallback<ColumnAggregationProtos.SumResponse>(); ColumnAggregationProtos.SumRequest.Builder builder = ColumnAggregationProtos.SumRequest.newBuilder(); builder.setFamily(ByteStringer.wrap(family)); if (qualifier != null && qualifier.length > 0) { builder.setQualifier(ByteStringer.wrap(qualifier)); } instance.sum(null, builder.build(), rpcCallback); return rpcCallback.get().getSum(); } }); }
private Map<byte [], Long> sum(final Table table, final byte [] family, final byte [] qualifier, final byte [] start, final byte [] end) throws ServiceException, Throwable { return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class, start, end, new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() { @Override public Long call(ColumnAggregationProtos.ColumnAggregationService instance) throws IOException { BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback = new BlockingRpcCallback<ColumnAggregationProtos.SumResponse>(); ColumnAggregationProtos.SumRequest.Builder builder = ColumnAggregationProtos.SumRequest.newBuilder(); builder.setFamily(ByteStringer.wrap(family)); if (qualifier != null && qualifier.length > 0) { builder.setQualifier(ByteStringer.wrap(qualifier)); } instance.sum(null, builder.build(), rpcCallback); return rpcCallback.get().getSum(); } }); }
private Map<byte [], String> hello(final Table table, final String send, final byte [] start, final byte [] end) throws ServiceException, Throwable { return table.coprocessorService(PingProtos.PingService.class, start, end, new Batch.Call<PingProtos.PingService, String>() { @Override public String call(PingProtos.PingService instance) throws IOException { BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback = new BlockingRpcCallback<PingProtos.HelloResponse>(); PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder(); if (send != null) builder.setName(send); instance.hello(null, builder.build(), rpcCallback); PingProtos.HelloResponse r = rpcCallback.get(); return r != null && r.hasResponse()? r.getResponse(): null; } }); }
private Map<byte [], String> compoundOfHelloAndPing(final Table table, final byte [] start, final byte [] end) throws ServiceException, Throwable { return table.coprocessorService(PingProtos.PingService.class, start, end, new Batch.Call<PingProtos.PingService, String>() { @Override public String call(PingProtos.PingService instance) throws IOException { BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback = new BlockingRpcCallback<PingProtos.HelloResponse>(); PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder(); // Call ping on same instance. Use result calling hello on same instance. builder.setName(doPing(instance)); instance.hello(null, builder.build(), rpcCallback); PingProtos.HelloResponse r = rpcCallback.get(); return r != null && r.hasResponse()? r.getResponse(): null; } }); }
private Map<byte [], String> noop(final Table table, final byte [] start, final byte [] end) throws ServiceException, Throwable { return table.coprocessorService(PingProtos.PingService.class, start, end, new Batch.Call<PingProtos.PingService, String>() { @Override public String call(PingProtos.PingService instance) throws IOException { BlockingRpcCallback<PingProtos.NoopResponse> rpcCallback = new BlockingRpcCallback<PingProtos.NoopResponse>(); PingProtos.NoopRequest.Builder builder = PingProtos.NoopRequest.newBuilder(); instance.noop(null, builder.build(), rpcCallback); rpcCallback.get(); // Looks like null is expected when void. That is what the test below is looking for return null; } }); }
/** * Send the queries in parallel on the different region servers. Retries on failures. * If the method returns it means that there is no error, and the 'results' array will * contain no exception. On error, an exception is thrown, and the 'results' array will * contain results and exceptions. * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead */ @Override @Deprecated public <R> void processBatchCallback( List<? extends Row> list, TableName tableName, ExecutorService pool, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { AsyncRequestFuture ars = this.asyncProcess.submitAll( pool, tableName, list, callback, results); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); } }
/** * {@inheritDoc} */ @Override public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service, byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable) throws ServiceException, Throwable { final Map<byte[],R> results = Collections.synchronizedMap( new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR)); coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() { @Override public void update(byte[] region, byte[] row, R value) { if (region != null) { results.put(region, value); } } }); return results; }
<CResult> AsyncRequestFuture submitMultiActions(TableName tableName, List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback, Object[] results, boolean needResults, List<Exception> locationErrors, List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer, ExecutorService pool) { AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( tableName, retainedActions, nonceGroup, pool, callback, results, needResults); // Add location errors if any if (locationErrors != null) { for (int i = 0; i < locationErrors.size(); ++i) { int originalIndex = locationErrorRows.get(i); Row row = retainedActions.get(originalIndex).getAction(); ars.manageError(originalIndex, row, Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null); } } ars.sendMultiAction(actionsByServer, 1, null, false); return ars; }
/** * Submit immediately the list of rows, whatever the server status. Kept for backward * compatibility: it allows to be used with the batch interface that return an array of objects. * * @param pool ExecutorService to use. * @param tableName name of the table for which the submission is made. * @param rows the list of rows. * @param callback the callback. * @param results Optional array to return the results thru; backward compat. */ public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) { List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size()); // The position will be used by the processBatch to match the object array returned. int posInList = -1; NonceGenerator ng = this.connection.getNonceGenerator(); for (Row r : rows) { posInList++; if (r instanceof Put) { Put put = (Put) r; if (put.isEmpty()) { throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item"); } } Action<Row> action = new Action<Row>(r, posInList); setNonce(ng, r, action); actions.add(action); } AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null); ars.groupAndSendMultiAction(actions, 1); return ars; }
@Test public void testSubmitWithCB() throws Exception { ClusterConnection hc = createHConnection(); final AtomicInteger updateCalled = new AtomicInteger(0); Batch.Callback<Object> cb = new Batch.Callback<Object>() { @Override public void update(byte[] region, byte[] row, Object result) { updateCalled.incrementAndGet(); } }; AsyncProcess ap = new MyAsyncProcess(hc, conf); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, true)); final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false); Assert.assertTrue(puts.isEmpty()); ars.waitUntilDone(); Assert.assertEquals(updateCalled.get(), 1); }
private long invokeBulkDeleteProtocol(byte[] tableName, final Scan scan, final int rowBatchSize, final byte deleteType, final Long timeStamp) throws Throwable { HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName); long noOfDeletedRows = 0L; Batch.Call<BulkDeleteProtocol, BulkDeleteResponse> callable = new Batch.Call<BulkDeleteProtocol, BulkDeleteResponse>() { public BulkDeleteResponse call(BulkDeleteProtocol instance) throws IOException { return instance.delete(scan, deleteType, timeStamp, rowBatchSize); } }; Map<byte[], BulkDeleteResponse> result = ht.coprocessorExec(BulkDeleteProtocol.class, scan.getStartRow(), scan.getStopRow(), callable); for (BulkDeleteResponse response : result.values()) { noOfDeletedRows += response.getRowsDeleted(); } return noOfDeletedRows; }
@Test public void testCompountCall() throws Throwable { HTable table = new HTable(util.getConfiguration(), TEST_TABLE); Map<byte[],String> results = table.coprocessorExec(PingProtocol.class, ROW_A, ROW_C, new Batch.Call<PingProtocol,String>() { public String call(PingProtocol instance) { return instance.hello(instance.ping()); } }); verifyRegionResults(table, results, "Hello, pong", ROW_A); verifyRegionResults(table, results, "Hello, pong", ROW_B); verifyRegionResults(table, results, "Hello, pong", ROW_C); }
@Test public void testNullCall() throws Throwable { HTable table = new HTable(util.getConfiguration(), TEST_TABLE); Map<byte[],String> results = table.coprocessorExec(PingProtocol.class, ROW_A, ROW_C, new Batch.Call<PingProtocol,String>() { public String call(PingProtocol instance) { return instance.hello(null); } }); verifyRegionResults(table, results, "Who are you?", ROW_A); verifyRegionResults(table, results, "Who are you?", ROW_B); verifyRegionResults(table, results, "Who are you?", ROW_C); }
@Test public void testNullReturn() throws Throwable { HTable table = new HTable(util.getConfiguration(), TEST_TABLE); Map<byte[],String> results = table.coprocessorExec(PingProtocol.class, ROW_A, ROW_C, new Batch.Call<PingProtocol,String>(){ public String call(PingProtocol instance) { return instance.hello("nobody"); } }); verifyRegionResults(table, results, null, ROW_A); verifyRegionResults(table, results, null, ROW_B); verifyRegionResults(table, results, null, ROW_C); }
@Test public void testVoidReturnType() throws Throwable { HTable table = new HTable(util.getConfiguration(), TEST_TABLE); Map<byte[],Object> results = table.coprocessorExec(PingProtocol.class, ROW_A, ROW_C, new Batch.Call<PingProtocol,Object>(){ public Object call(PingProtocol instance) { instance.noop(); return null; } }); assertEquals("Should have results from three regions", 3, results.size()); // all results should be null for (Object v : results.values()) { assertNull(v); } }
/** * Randomly pick a connection and process the batch of actions for a given table * @param actions the actions * @param tableName table name * @param results the results array * @param callback * @throws IOException * @throws InterruptedException */ @SuppressWarnings("deprecation") public <R> void processBatchCallback(List<? extends Row> actions, TableName tableName, Object[] results, Batch.Callback<R> callback) throws IOException { // Currently used by RegionStateStore // A deprecated method is used as multiple threads accessing RegionStateStore do a single put // and htable is not thread safe. Alternative would be to create an Htable instance for each // put but that is not very efficient. // See HBASE-11610 for more details. try { hConnections[ThreadLocalRandom.current().nextInt(noOfConnections)].processBatchCallback( actions, tableName, this.batchPool, results, callback); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } }
/** * Send the queries in parallel on the different region servers. Retries on failures. * If the method returns it means that there is no error, and the 'results' array will * contain no exception. On error, an exception is thrown, and the 'results' array will * contain results and exceptions. * * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead */ @Override @Deprecated public <R> void processBatchCallback( List<? extends Row> list, TableName tableName, ExecutorService pool, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { AsyncRequestFuture ars = this.asyncProcess.submitAll( pool, tableName, list, callback, results); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); } }
private Map<byte [], Long> sum(final HTable table, final byte [] family, final byte [] qualifier, final byte [] start, final byte [] end) throws ServiceException, Throwable { return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class, start, end, new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() { @Override public Long call(ColumnAggregationProtos.ColumnAggregationService instance) throws IOException { BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback = new BlockingRpcCallback<ColumnAggregationProtos.SumResponse>(); ColumnAggregationProtos.SumRequest.Builder builder = ColumnAggregationProtos.SumRequest.newBuilder(); builder.setFamily(HBaseZeroCopyByteString.wrap(family)); if (qualifier != null && qualifier.length > 0) { builder.setQualifier(HBaseZeroCopyByteString.wrap(qualifier)); } instance.sum(null, builder.build(), rpcCallback); return rpcCallback.get().getSum(); } }); }
private Map<byte [], String> compoundOfHelloAndPing(final HTable table, final byte [] start, final byte [] end) throws ServiceException, Throwable { return table.coprocessorService(PingProtos.PingService.class, start, end, new Batch.Call<PingProtos.PingService, String>() { @Override public String call(PingProtos.PingService instance) throws IOException { BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback = new BlockingRpcCallback<PingProtos.HelloResponse>(); PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder(); // Call ping on same instance. Use result calling hello on same instance. builder.setName(doPing(instance)); instance.hello(null, builder.build(), rpcCallback); PingProtos.HelloResponse r = rpcCallback.get(); return r != null && r.hasResponse()? r.getResponse(): null; } }); }
/** * Send the queries in parallel on the different region servers. Retries on failures. * If the method returns it means that there is no error, and the 'results' array will * contain no exception. On error, an exception is thrown, and the 'results' array will * contain results and exceptions. * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead */ @Override @Deprecated public <R> void processBatchCallback( List<? extends Row> list, TableName tableName, ExecutorService pool, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { // To fulfill the original contract, we have a special callback. This callback // will set the results in the Object array. ObjectResultFiller<R> cb = new ObjectResultFiller<R>(results, callback); AsyncProcess<?> asyncProcess = createAsyncProcess(tableName, pool, cb, conf); // We're doing a submit all. This way, the originalIndex will match the initial list. asyncProcess.submitAll(list); asyncProcess.waitUntilDone(); if (asyncProcess.hasError()) { throw asyncProcess.getErrors(); } }
@Test public void testSingleMethod() throws Throwable { try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) { RegionLocator locator = table.getRegionLocator(); Map<byte [], String> results = table.coprocessorService(PingProtos.PingService.class, null, ROW_A, new Batch.Call<PingProtos.PingService, String>() { @Override public String call(PingProtos.PingService instance) throws IOException { BlockingRpcCallback<PingProtos.PingResponse> rpcCallback = new BlockingRpcCallback<PingProtos.PingResponse>(); instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback); return rpcCallback.get().getPong(); } }); // Should have gotten results for 1 of the three regions only since we specified // rows from 1 region assertEquals(1, results.size()); verifyRegionResults(locator, results, ROW_A); final String name = "NAME"; results = hello(table, name, null, ROW_A); // Should have gotten results for 1 of the three regions only since we specified // rows from 1 region assertEquals(1, results.size()); verifyRegionResults(locator, results, "Hello, NAME", ROW_A); } }
private Map<byte [], String> ping(final Table table, final byte [] start, final byte [] end) throws ServiceException, Throwable { return table.coprocessorService(PingProtos.PingService.class, start, end, new Batch.Call<PingProtos.PingService, String>() { @Override public String call(PingProtos.PingService instance) throws IOException { return doPing(instance); } }); }
/** * @param connection the Connection instance to use. * @param user * @return labels, the given user is globally authorized for. * @throws Throwable */ public static GetAuthsResponse getAuths(Connection connection, final String user) throws Throwable { try (Table table = connection.getTable(LABELS_TABLE_NAME)) { Batch.Call<VisibilityLabelsService, GetAuthsResponse> callable = new Batch.Call<VisibilityLabelsService, GetAuthsResponse>() { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<GetAuthsResponse> rpcCallback = new BlockingRpcCallback<GetAuthsResponse>(); public GetAuthsResponse call(VisibilityLabelsService service) throws IOException { GetAuthsRequest.Builder getAuthReqBuilder = GetAuthsRequest.newBuilder(); getAuthReqBuilder.setUser(ByteStringer.wrap(Bytes.toBytes(user))); service.getAuths(controller, getAuthReqBuilder.build(), rpcCallback); GetAuthsResponse response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } return response; } }; Map<byte[], GetAuthsResponse> result = table.coprocessorService(VisibilityLabelsService.class, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, callable); return result.values().iterator().next(); // There will be exactly one region for labels // table and so one entry in result Map. } }
private static VisibilityLabelsResponse setOrClearAuths(Connection connection, final String[] auths, final String user, final boolean setOrClear) throws IOException, ServiceException, Throwable { try (Table table = connection.getTable(LABELS_TABLE_NAME)) { Batch.Call<VisibilityLabelsService, VisibilityLabelsResponse> callable = new Batch.Call<VisibilityLabelsService, VisibilityLabelsResponse>() { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<VisibilityLabelsResponse> rpcCallback = new BlockingRpcCallback<VisibilityLabelsResponse>(); public VisibilityLabelsResponse call(VisibilityLabelsService service) throws IOException { SetAuthsRequest.Builder setAuthReqBuilder = SetAuthsRequest.newBuilder(); setAuthReqBuilder.setUser(ByteStringer.wrap(Bytes.toBytes(user))); for (String auth : auths) { if (auth.length() > 0) { setAuthReqBuilder.addAuth(ByteStringer.wrap(Bytes.toBytes(auth))); } } if (setOrClear) { service.setAuths(controller, setAuthReqBuilder.build(), rpcCallback); } else { service.clearAuths(controller, setAuthReqBuilder.build(), rpcCallback); } VisibilityLabelsResponse response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } return response; } }; Map<byte[], VisibilityLabelsResponse> result = table.coprocessorService( VisibilityLabelsService.class, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, callable); return result.values().iterator().next(); // There will be exactly one region for labels // table and so one entry in result Map. } }
/** * Parameterized batch processing, allowing varying return types for different * {@link Row} implementations. * @deprecated since 0.96 - Use {@link HTableInterface#batchCallback} instead */ @Deprecated public <R> void processBatchCallback(List<? extends Row> list, final TableName tableName, ExecutorService pool, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException;
@Override @Deprecated public <R> void processBatchCallback( List<? extends Row> list, byte[] tableName, ExecutorService pool, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback); }
/** * {@inheritDoc} */ @Override public <R> void batchCallback( final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback) throws IOException, InterruptedException { connection.processBatchCallback(actions, tableName, pool, results, callback); }
/** * {@inheritDoc} * @deprecated If any exception is thrown by one of the actions, there is no way to * retrieve the partially executed results. Use * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)} * instead. */ @Deprecated @Override public <R> Object[] batchCallback( final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException, InterruptedException { Object[] results = new Object[actions.size()]; batchCallback(actions, results, callback); return results; }
@Override public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) throws ServiceException, Throwable { checkState(); return table.coprocessorService(service, startKey, endKey, callable); }
@Override public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback) throws ServiceException, Throwable { checkState(); table.coprocessorService(service, startKey, endKey, callable, callback); }
@VisibleForTesting /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */ protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, Batch.Callback<CResult> callback, Object[] results, boolean needResults) { return new AsyncRequestFutureImpl<CResult>( tableName, actions, nonceGroup, getPool(pool), needResults, results, callback); }
@Override protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, Batch.Callback<Res> callback, Object[] results, boolean needResults) { // Test HTable has tableName of null, so pass DUMMY_TABLE AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture( DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults); allReqs.add(r); callsCt.incrementAndGet(); return r; }