/** * {@inheritDoc} */ @Override public <T extends CoprocessorProtocol> T coprocessorProxy( Class<T> protocol, byte[] row) { return (T)Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{protocol}, new ExecRPCInvoker(configuration, connection, protocol, tableName, row)); }
/** * Executes the given * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} * callable for each row in the * given list and invokes * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)} * for each result returned. * * @param protocol the protocol interface being called * @param rows a list of row keys for which the callable should be invoked * @param tableName table name for the coprocessor invoked * @param pool ExecutorService used to submit the calls per row * @param callable instance on which to invoke * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)} * for each row * @param callback instance on which to invoke * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)} * for each result * @param <T> the protocol interface type * @param <R> the callable's return type * @throws IOException */ public <T extends CoprocessorProtocol,R> void processExecs( final Class<T> protocol, List<byte[]> rows, final byte[] tableName, ExecutorService pool, final Batch.Call<T,R> callable, final Batch.Callback<R> callback) throws IOException, Throwable { Map<byte[],Future<R>> futures = new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR); for (final byte[] r : rows) { final ExecRPCInvoker invoker = new ExecRPCInvoker(conf, this, protocol, tableName, r); Future<R> future = pool.submit( new Callable<R>() { public R call() throws Exception { T instance = (T)Proxy.newProxyInstance(conf.getClassLoader(), new Class[]{protocol}, invoker); R result = callable.call(instance); byte[] region = invoker.getRegionName(); if (callback != null) { callback.update(region, r, result); } return result; } }); futures.put(r, future); } for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) { try { e.getValue().get(); } catch (ExecutionException ee) { LOG.warn("Error executing for row "+Bytes.toStringBinary(e.getKey()), ee); throw ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new IOException("Interrupted executing for row " + Bytes.toStringBinary(e.getKey()), ie); } } }
/** * {@inheritDoc} */ @Override public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol, byte[] row) { return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] { protocol }, new ExecRPCInvoker(configuration, connection, protocol, tableName, row)); }