@Test public void testFilterAllRecords() throws IOException { Scan scan = new Scan(); scan.setBatch(1); scan.setCaching(1); // Filter out any records scan.setFilter(new FilterList(new FirstKeyOnlyFilter(), new InclusiveStopFilter(new byte[0]))); Table table = TEST_UTIL.getConnection().getTable(TableName.NAMESPACE_TABLE_NAME); ResultScanner s = table.getScanner(scan); assertNull(s.next()); table.close(); }
@Test public void testInclusiveStopFilter() throws Exception { // Grab rows from group one // If we just use start/stop row, we get total/2 - 1 rows long expectedRows = (numRows / 2) - 1; long expectedKeys = colsPerRow; Scan s = new Scan(Bytes.toBytes("testRowOne-0"), Bytes.toBytes("testRowOne-3")); verifyScan(s, expectedRows, expectedKeys); // Now use start row with inclusive stop filter expectedRows = numRows / 2; s = new Scan(Bytes.toBytes("testRowOne-0")); s.setFilter(new InclusiveStopFilter(Bytes.toBytes("testRowOne-3"))); verifyScan(s, expectedRows, expectedKeys); // Grab rows from group two // If we just use start/stop row, we get total/2 - 1 rows expectedRows = (numRows / 2) - 1; expectedKeys = colsPerRow; s = new Scan(Bytes.toBytes("testRowTwo-0"), Bytes.toBytes("testRowTwo-3")); verifyScan(s, expectedRows, expectedKeys); // Now use start row with inclusive stop filter expectedRows = numRows / 2; s = new Scan(Bytes.toBytes("testRowTwo-0")); s.setFilter(new InclusiveStopFilter(Bytes.toBytes("testRowTwo-3"))); verifyScan(s, expectedRows, expectedKeys); }
@Test public void testFilterAllRecords() throws IOException { Scan scan = new Scan(); scan.setBatch(1); scan.setCaching(1); // Filter out any records scan.setFilter(new FilterList(new FirstKeyOnlyFilter(), new InclusiveStopFilter(new byte[0]))); try (Table table = TEST_UTIL.getConnection().getTable(TableName.NAMESPACE_TABLE_NAME)) { try (ResultScanner s = table.getScanner(scan)) { assertNull(s.next()); } } }
@Override public void queryByStartRowAndEndRow(RpcController controller, DataProtos.DataQueryRequest request, RpcCallback<DataQueryResponse> done) { DataProtos.DataQueryResponse response = null; InternalScanner scanner = null; try { String startRow = request.getStartRow(); String endRow = request.getEndRow(); String regionStartKey = Bytes.toString(this.env.getRegion().getRegionInfo().getStartKey()); String regionEndKey = Bytes.toString(this.env.getRegion().getRegionInfo().getEndKey()); if (request.getIsSalting()) { // 如果加盐过则在key前添加盐值 String startSalt = null; String endSalt = null; if (StrUtils.isNotEmpty(regionStartKey)) { startSalt = regionStartKey.split("_")[0]; // 加盐的方式为盐值+"_",所以取_前面的 } if (StrUtils.isNotEmpty(regionEndKey)) { endSalt = regionStartKey.split("_")[0]; //加盐的方式为盐值+"_",所以取_前面的 } if (startSalt != null) { if (null != startRow) { startRow = startSalt + "_" + startRow; endRow = endSalt + "_" + endRow; } } } Scan scan = new Scan(); if (null != startRow) { scan.setStartRow(Bytes.toBytes(startRow)); } if (null != endRow) { if (request.getIncluedEnd()) { Filter filter = new InclusiveStopFilter(Bytes.toBytes(endRow)); scan.setFilter(filter); } else { scan.setStopRow(Bytes.toBytes(endRow)); } } scanner = this.env.getRegion().getScanner(scan); List<Cell> results = new ArrayList<>(); boolean hasMore; DataProtos.DataQueryResponse.Builder responseBuilder = DataProtos.DataQueryResponse.newBuilder(); do { hasMore = scanner.next(results); DataProtos.DataQueryResponse.Row.Builder rowBuilder = DataProtos.DataQueryResponse.Row.newBuilder(); if (results.size() > 0) { rowBuilder.setRowKey(ByteString.copyFrom(results.get(0).getRow())); for (Cell kv : results) { queryBuilder(rowBuilder, ByteString.copyFrom(kv.getFamily()), ByteString.copyFrom(kv.getQualifier()), ByteString.copyFrom(kv.getRow()), ByteString.copyFrom(kv.getValue())); } } responseBuilder.addRowList(rowBuilder); results.clear(); } while (hasMore); response = responseBuilder.build(); } catch (IOException ignored) { ResponseConverter.setControllerException(controller, ignored); } finally { if (scanner != null) { try { scanner.close(); } catch (IOException e) { _log.error(ExceptionUtils.errorInfo(e)); } } } done.run(response); }
private void initScanner() throws IOException { scan = new Scan(); scan.setBatch(scanFetchSize); scan.setCacheBlocks(false); scan.setCaching(scanFetchSize); FilterList filters = null; if (targetIndexes == null || targetIndexes.length == 0) { filters = new FilterList(FilterList.Operator.MUST_PASS_ALL); filters.addFilter(new FirstKeyOnlyFilter()); filters.addFilter(new KeyOnlyFilter()); } else { boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings(); for (int eachIndex : targetIndexes) { if (isRowKeyMappings[eachIndex]) { continue; } byte[][] mappingColumn = columnMapping.getMappingColumns()[eachIndex]; if (mappingColumn[1] == null) { scan.addFamily(mappingColumn[0]); } else { scan.addColumn(mappingColumn[0], mappingColumn[1]); } } } scan.setStartRow(fragment.getStartKey().getBytes()); if (fragment.isLast() && !fragment.getEndKey().isEmpty() && fragment.getEndKey().getBytes().length > 0) { // last and stopRow is not empty if (filters == null) { filters = new FilterList(); } filters.addFilter(new InclusiveStopFilter(fragment.getEndKey().getBytes())); } else { scan.setStopRow(fragment.getEndKey().getBytes()); } if (filters != null) { scan.setFilter(filters); } if (htable == null) { HConnection hconn = ((HBaseTablespace) TablespaceManager.get(fragment.getUri())).getConnection(); htable = hconn.getTable(fragment.getHbaseTableName()); } scanner = htable.getScanner(scan); }
@Test public void testCompositeRowIndexPredication() throws Exception { executeString("CREATE TABLE hbase_mapped_table (rk text, rk2 text, col1 text, col2 text, col3 text) " + "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " + "'hbase.split.rowkeys'='010,040,060,080', " + "'hbase.rowkey.delimiter'='_')").close(); assertTableExists("hbase_mapped_table"); HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); hAdmin.tableExists("hbase_table"); HTable htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); try { org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); assertEquals(5, keys.getFirst().length); DecimalFormat df = new DecimalFormat("000"); for (int i = 0; i < 100; i++) { Put put = new Put((df.format(i) + "_" + df.format(i)).getBytes()); put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); htable.put(put); } Scan scan = new Scan(); scan.setStartRow("021".getBytes()); scan.setStopRow(("021_" + new String(new char[]{Character.MAX_VALUE})).getBytes()); Filter filter = new InclusiveStopFilter(scan.getStopRow()); scan.setFilter(filter); ResultScanner scanner = htable.getScanner(scan); Result result = scanner.next(); assertNotNull(result); assertEquals("021_021", new String(result.getRow())); scanner.close(); assertIndexPredication(true); ResultSet res = executeString("select * from hbase_mapped_table where rk = '021'"); String expected = "rk,rk2,col1,col2,col3\n" + "-------------------------------\n" + "021,021,a-21,{\"k1\":\"k1-21\", \"k2\":\"k2-21\"},b-21\n"; assertEquals(expected, resultSetToString(res)); res.close(); } finally { executeString("DROP TABLE hbase_mapped_table PURGE").close(); htable.close(); hAdmin.close(); } }