/** * It gives the maximum value of a column for a given column family for the * given range. In case qualifier is null, a max of all values for the given * family is returned. * @param tableName * @param ci * @param scan * @return max val <R> * @throws Throwable * The caller is supposed to handle the exception as they are thrown * & propagated to it. */ public <R, S> R max(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); class MaxCallBack implements Batch.Callback<R> { R max = null; R getMax() { return max; } @Override public synchronized void update(byte[] region, byte[] row, R result) { max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max; } } MaxCallBack aMaxCallBack = new MaxCallBack(); HTable table = null; try { table = new HTable(conf, tableName); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtocol, R>() { @Override public R call(AggregateProtocol instance) throws IOException { return instance.getMax(ci, scan); } }, aMaxCallBack); } finally { if (table != null) { table.close(); } } return aMaxCallBack.getMax(); }
/** * It gives the minimum value of a column for a given column family for the * given range. In case qualifier is null, a min of all values for the given * family is returned. * @param tableName * @param ci * @param scan * @return min val <R> * @throws Throwable */ public <R, S> R min(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); class MinCallBack implements Batch.Callback<R> { private R min = null; public R getMinimum() { return min; } @Override public synchronized void update(byte[] region, byte[] row, R result) { min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min; } } MinCallBack minCallBack = new MinCallBack(); HTable table = null; try { table = new HTable(conf, tableName); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtocol, R>() { @Override public R call(AggregateProtocol instance) throws IOException { return instance.getMin(ci, scan); } }, minCallBack); } finally { if (table != null) { table.close(); } } log.debug("Min fom all regions is: " + minCallBack.getMinimum()); return minCallBack.getMinimum(); }
/** * It gives the row count, by summing up the individual results obtained from * regions. In case the qualifier is null, FirstKEyValueFilter is used to * optimised the operation. In case qualifier is provided, I can't use the * filter as it may set the flag to skip to next row, but the value read is * not of the given filter: in this case, this particular row will not be * counted ==> an error. * @param tableName * @param ci * @param scan * @return <R, S> * @throws Throwable */ public <R, S> long rowCount(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); class RowNumCallback implements Batch.Callback<Long> { private final AtomicLong rowCountL = new AtomicLong(0); public long getRowNumCount() { return rowCountL.get(); } @Override public void update(byte[] region, byte[] row, Long result) { rowCountL.addAndGet(result.longValue()); } } RowNumCallback rowNum = new RowNumCallback(); HTable table = null; try { table = new HTable(conf, tableName); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtocol, Long>() { @Override public Long call(AggregateProtocol instance) throws IOException { return instance.getRowNum(ci, scan); } }, rowNum); } finally { if (table != null) { table.close(); } } return rowNum.getRowNumCount(); }
/** * It sums up the value returned from various regions. In case qualifier is * null, summation of all the column qualifiers in the given family is done. * @param tableName * @param ci * @param scan * @return sum <S> * @throws Throwable */ public <R, S> S sum(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); class SumCallBack implements Batch.Callback<S> { S sumVal = null; public S getSumResult() { return sumVal; } @Override public synchronized void update(byte[] region, byte[] row, S result) { sumVal = ci.add(sumVal, result); } } SumCallBack sumCallBack = new SumCallBack(); HTable table = null; try { table = new HTable(conf, tableName); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtocol, S>() { @Override public S call(AggregateProtocol instance) throws IOException { return instance.getSum(ci, scan); } }, sumCallBack); } finally { if (table != null) { table.close(); } } return sumCallBack.getSumResult(); }
/** * It computes average while fetching sum and row count from all the * corresponding regions. Approach is to compute a global sum of region level * sum and rowcount and then compute the average. * @param tableName * @param scan * @throws Throwable */ private <R, S> Pair<S, Long> getAvgArgs(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); class AvgCallBack implements Batch.Callback<Pair<S, Long>> { S sum = null; Long rowCount = 0l; public Pair<S, Long> getAvgArgs() { return new Pair<S, Long>(sum, rowCount); } @Override public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) { sum = ci.add(sum, result.getFirst()); rowCount += result.getSecond(); } } AvgCallBack avgCallBack = new AvgCallBack(); HTable table = null; try { table = new HTable(conf, tableName); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtocol, Pair<S, Long>>() { @Override public Pair<S, Long> call(AggregateProtocol instance) throws IOException { return instance.getAvg(ci, scan); } }, avgCallBack); } finally { if (table != null) { table.close(); } } return avgCallBack.getAvgArgs(); }
/** * It gives the maximum value of a column for a given column family for the * given range. In case qualifier is null, a max of all values for the given * family is returned. * @param tableName * @param ci * @param scan * @return max val <R> * @throws Throwable * The caller is supposed to handle the exception as they are thrown * & propagated to it. */ public <R, S> R max(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); HTable table = new HTable(conf, tableName); class MaxCallBack implements Batch.Callback<R> { R max = null; R getMax() { return max; } @Override public synchronized void update(byte[] region, byte[] row, R result) { max = ci.compare(max, result) < 0 ? result : max; } } MaxCallBack aMaxCallBack = new MaxCallBack(); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan .getStopRow(), new Batch.Call<AggregateProtocol, R>() { @Override public R call(AggregateProtocol instance) throws IOException { return instance.getMax(ci, scan); } }, aMaxCallBack); return aMaxCallBack.getMax(); }
/** * It gives the minimum value of a column for a given column family for the * given range. In case qualifier is null, a min of all values for the given * family is returned. * @param tableName * @param ci * @param scan * @return min val <R> * @throws Throwable */ public <R, S> R min(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); class MinCallBack implements Batch.Callback<R> { private R min = null; public R getMinimum() { return min; } @Override public synchronized void update(byte[] region, byte[] row, R result) { min = (min == null || ci.compare(result, min) < 0) ? result : min; } } HTable table = new HTable(conf, tableName); MinCallBack minCallBack = new MinCallBack(); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan .getStopRow(), new Batch.Call<AggregateProtocol, R>() { @Override public R call(AggregateProtocol instance) throws IOException { return instance.getMin(ci, scan); } }, minCallBack); log.debug("Min fom all regions is: " + minCallBack.getMinimum()); return minCallBack.getMinimum(); }
/** * It gives the row count, by summing up the individual results obtained from * regions. In case the qualifier is null, FirstKEyValueFilter is used to * optimised the operation. In case qualifier is provided, I can't use the * filter as it may set the flag to skip to next row, but the value read is * not of the given filter: in this case, this particular row will not be * counted ==> an error. * @param tableName * @param ci * @param scan * @return <R, S> * @throws Throwable */ public <R, S> long rowCount(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); class RowNumCallback implements Batch.Callback<Long> { private final AtomicLong rowCountL = new AtomicLong(0); public long getRowNumCount() { return rowCountL.get(); } @Override public void update(byte[] region, byte[] row, Long result) { rowCountL.addAndGet(result.longValue()); } } RowNumCallback rowNum = new RowNumCallback(); HTable table = new HTable(conf, tableName); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan .getStopRow(), new Batch.Call<AggregateProtocol, Long>() { @Override public Long call(AggregateProtocol instance) throws IOException { return instance.getRowNum(ci, scan); } }, rowNum); return rowNum.getRowNumCount(); }
/** * It sums up the value returned from various regions. In case qualifier is * null, summation of all the column qualifiers in the given family is done. * @param tableName * @param ci * @param scan * @return sum <S> * @throws Throwable */ public <R, S> S sum(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); class SumCallBack implements Batch.Callback<S> { S sumVal = null; public S getSumResult() { return sumVal; } @Override public synchronized void update(byte[] region, byte[] row, S result) { sumVal = ci.add(sumVal, result); } } SumCallBack sumCallBack = new SumCallBack(); HTable table = new HTable(conf, tableName); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan .getStopRow(), new Batch.Call<AggregateProtocol, S>() { @Override public S call(AggregateProtocol instance) throws IOException { return instance.getSum(ci, scan); } }, sumCallBack); return sumCallBack.getSumResult(); }
/** * It computes average while fetching sum and row count from all the * corresponding regions. Approach is to compute a global sum of region level * sum and rowcount and then compute the average. * @param tableName * @param scan * @throws Throwable */ private <R, S> Pair<S, Long> getAvgArgs(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); class AvgCallBack implements Batch.Callback<Pair<S, Long>> { S sum = null; Long rowCount = 0l; public Pair<S, Long> getAvgArgs() { return new Pair<S, Long>(sum, rowCount); } @Override public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) { sum = ci.add(sum, result.getFirst()); rowCount += result.getSecond(); } } AvgCallBack avgCallBack = new AvgCallBack(); HTable table = new HTable(conf, tableName); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan .getStopRow(), new Batch.Call<AggregateProtocol, Pair<S, Long>>() { @Override public Pair<S, Long> call(AggregateProtocol instance) throws IOException { return instance.getAvg(ci, scan); } }, avgCallBack); return avgCallBack.getAvgArgs(); }
/** * It computes a global standard deviation for a given column and its value. * Standard deviation is square root of (average of squares - * average*average). From individual regions, it obtains sum, square sum and * number of rows. With these, the above values are computed to get the global * std. * @param tableName * @param scan * @return * @throws Throwable */ private <R, S> Pair<List<S>, Long> getStdArgs(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); class StdCallback implements Batch.Callback<Pair<List<S>, Long>> { long rowCountVal = 0l; S sumVal = null, sumSqVal = null; public Pair<List<S>, Long> getStdParams() { List<S> l = new ArrayList<S>(); l.add(sumVal); l.add(sumSqVal); Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal); return p; } @Override public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) { sumVal = ci.add(sumVal, result.getFirst().get(0)); sumSqVal = ci.add(sumSqVal, result.getFirst().get(1)); rowCountVal += result.getSecond(); } } StdCallback stdCallback = new StdCallback(); HTable table = new HTable(conf, tableName); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan .getStopRow(), new Batch.Call<AggregateProtocol, Pair<List<S>, Long>>() { @Override public Pair<List<S>, Long> call(AggregateProtocol instance) throws IOException { return instance.getStd(ci, scan); } }, stdCallback); return stdCallback.getStdParams(); }
/** * It computes a global standard deviation for a given column and its value. * Standard deviation is square root of (average of squares - * average*average). From individual regions, it obtains sum, square sum and * number of rows. With these, the above values are computed to get the global * std. * @param tableName * @param scan * @return * @throws Throwable */ private <R, S> Pair<List<S>, Long> getStdArgs(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); class StdCallback implements Batch.Callback<Pair<List<S>, Long>> { long rowCountVal = 0l; S sumVal = null, sumSqVal = null; public Pair<List<S>, Long> getStdParams() { List<S> l = new ArrayList<S>(); l.add(sumVal); l.add(sumSqVal); Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal); return p; } @Override public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) { sumVal = ci.add(sumVal, result.getFirst().get(0)); sumSqVal = ci.add(sumSqVal, result.getFirst().get(1)); rowCountVal += result.getSecond(); } } StdCallback stdCallback = new StdCallback(); HTable table = null; try { table = new HTable(conf, tableName); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtocol, Pair<List<S>, Long>>() { @Override public Pair<List<S>, Long> call(AggregateProtocol instance) throws IOException { return instance.getStd(ci, scan); } }, stdCallback); } finally { if (table != null) { table.close(); } } return stdCallback.getStdParams(); }
/** * It helps locate the region with median for a given column whose weight * is specified in an optional column. * From individual regions, it obtains sum of values and sum of weights. * @param tableName * @param ci * @param scan * @return pair whose first element is a map between start row of the region * and (sum of values, sum of weights) for the region, the second element is * (sum of values, sum of weights) for all the regions chosen * @throws Throwable */ private <R, S> Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianArgs(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); final NavigableMap<byte[], List<S>> map = new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR); class StdCallback implements Batch.Callback<List<S>> { S sumVal = null, sumWeights = null; public Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() { List<S> l = new ArrayList<S>(); l.add(sumVal); l.add(sumWeights); Pair<NavigableMap<byte[], List<S>>, List<S>> p = new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l); return p; } @Override public synchronized void update(byte[] region, byte[] row, List<S> result) { map.put(row, result); sumVal = ci.add(sumVal, result.get(0)); sumWeights = ci.add(sumWeights, result.get(1)); } } StdCallback stdCallback = new StdCallback(); HTable table = null; try { table = new HTable(conf, tableName); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtocol, List<S>>() { @Override public List<S> call(AggregateProtocol instance) throws IOException { return instance.getMedian(ci, scan); } }, stdCallback); } finally { if (table != null) { table.close(); } } return stdCallback.getMedianParams(); }