public <T extends HBase> List<T> search(List<byte[]> rowkeys, FilterBase filter, Class<? extends HBase> clazz) throws Exception{ TableMeta tableMeta = getTableMeta(clazz); List<Get> gets = Lists.newArrayList(); for (byte[] rowkey : rowkeys) { Get get = new Get(rowkey); if (null != filter) { get.setFilter(filter); } gets.add(get); } List<T> hBaseList = Lists.newArrayList(); try (Table table = connection.getTable(tableMeta.getHtableName())){ Result[] results = table.get(gets); for (Result rs: results){ HBase hBase = tableMeta.parse(rs); if(hBase != null){ hBaseList.add((T)hBase); } } } return hBaseList; }
@Test public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); final int expectedSize = 3; testFlushBeforeCompletingScan(new MyListHook() { @Override public void hook(int currentSize) { if (currentSize == expectedSize - 1) { try { flushStore(store, id++); timeToGoNextRow.set(true); } catch (IOException e) { throw new RuntimeException(e); } } } }, new FilterBase() { @Override public Filter.ReturnCode filterCell(final Cell c) throws IOException { return ReturnCode.INCLUDE; } }, expectedSize); }
/** * Test addFilterAndArguments method of Import This method set couple * parameters into Configuration */ @Test public void testAddFilterAndArguments() throws IOException { Configuration configuration = new Configuration(); List<String> args = new ArrayList<String>(); args.add("param1"); args.add("param2"); Import.addFilterAndArguments(configuration, FilterBase.class, args); assertEquals("org.apache.hadoop.hbase.filter.FilterBase", configuration.get(Import.FILTER_CLASS_CONF_KEY)); assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY)); }
@Test public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); final int expectedSize = 2; testFlushBeforeCompletingScan(new MyListHook() { @Override public void hook(int currentSize) { if (currentSize == expectedSize - 1) { try { flushStore(store, id++); timeToGoNextRow.set(true); } catch (IOException e) { throw new RuntimeException(e); } } } }, new FilterBase() { @Override public Filter.ReturnCode filterCell(final Cell c) throws IOException { if (timeToGoNextRow.get()) { timeToGoNextRow.set(false); return ReturnCode.NEXT_ROW; } else { return ReturnCode.INCLUDE; } } }, expectedSize); }
@Test public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, InterruptedException { final AtomicBoolean timeToGetHint = new AtomicBoolean(false); final int expectedSize = 2; testFlushBeforeCompletingScan(new MyListHook() { @Override public void hook(int currentSize) { if (currentSize == expectedSize - 1) { try { flushStore(store, id++); timeToGetHint.set(true); } catch (IOException e) { throw new RuntimeException(e); } } } }, new FilterBase() { @Override public Filter.ReturnCode filterCell(final Cell c) throws IOException { if (timeToGetHint.get()) { timeToGetHint.set(false); return Filter.ReturnCode.SEEK_NEXT_USING_HINT; } else { return Filter.ReturnCode.INCLUDE; } } @Override public Cell getNextCellHint(Cell currentCell) throws IOException { return currentCell; } }, expectedSize); }
/** * Test addFilterAndArguments method of Import This method set couple * parameters into Configuration */ @Test public void testAddFilterAndArguments() throws IOException { Configuration configuration = new Configuration(); List<String> args = new ArrayList<>(); args.add("param1"); args.add("param2"); Import.addFilterAndArguments(configuration, FilterBase.class, args); assertEquals("org.apache.hadoop.hbase.filter.FilterBase", configuration.get(Import.FILTER_CLASS_CONF_KEY)); assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY)); }
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region) throws IOException { // DebugPrint.println("HRegionScanner.<init>"); this.region = region; this.filter = scan.getFilter(); this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { this.stopRow = null; } else { this.stopRow = scan.getStopRow(); } // If we are doing a get, we want to be [startRow,endRow] normally // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; // synchronize on scannerReadPoints so that nobody calculates // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); synchronized(scannerReadPoints) { if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { // This scan can read even uncommitted transactions this.readPt = Long.MAX_VALUE; MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); } else { this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); } scannerReadPoints.put(this, this.readPt); } // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>(); if (additionalScanners != null) { scanners.addAll(additionalScanners); } for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); KeyValueScanner scanner = store.getScanner(scan, entry.getValue()); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || FilterBase.isFamilyEssential(this.filter, entry.getKey())) { scanners.add(scanner); } else { joinedScanners.add(scanner); } } this.storeHeap = new KeyValueHeap(scanners, comparator); if (!joinedScanners.isEmpty()) { this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); } // whether to use index byte[] tmpvalue=scan.getAttribute(IndexConstants.SCAN_WITH_INDEX); if(tmpvalue!=null){ this.useIndex = Bytes.toBoolean(tmpvalue); } tmpvalue=scan.getAttribute(IndexConstants.MAX_SCAN_SCALE); float maxScale=IndexConstants.DEFAULT_MAX_SCAN_SCALE; if(tmpvalue!=null){ maxScale = Bytes.toFloat(tmpvalue); } if (this.useIndex) { indexTree = ScanPreprocess.preprocess(this.region, scan.getFilter(), maxScale); if (indexTree!=null) { useIndex = true; long buildStartTime = System.currentTimeMillis(); generateCandidateRows(scan); this.indexReadTime = (System.currentTimeMillis() - buildStartTime) - this.indexMergeTime - this.indexSortTime; } else { useIndex = false; LOG.debug("skip using index"); } } }
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region) throws IOException { // DebugPrint.println("HRegionScanner.<init>"); this.region = region; this.filter = scan.getFilter(); this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { this.stopRow = null; } else { this.stopRow = scan.getStopRow(); } // If we are doing a get, we want to be [startRow,endRow] normally // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; // synchronize on scannerReadPoints so that nobody calculates // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); synchronized(scannerReadPoints) { if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { // This scan can read even uncommitted transactions this.readPt = Long.MAX_VALUE; MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); } else { this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); } scannerReadPoints.put(this, this.readPt); } // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>(); if (additionalScanners != null) { scanners.addAll(additionalScanners); } for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); KeyValueScanner scanner = store.getScanner(scan, entry.getValue()); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || FilterBase.isFamilyEssential(this.filter, entry.getKey())) { scanners.add(scanner); } else { joinedScanners.add(scanner); } } this.storeHeap = new KeyValueHeap(scanners, comparator); if (!joinedScanners.isEmpty()) { this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); } }
private Scan wrapWithDeleteFilter(final Scan scan, final TransactionState state) { FilterBase deleteFilter = new FilterBase() { private boolean rowFiltered = false; @Override public void reset() { rowFiltered = false; } @Override public boolean hasFilterRow() { return true; } @Override public void filterRow(final List<KeyValue> kvs) { state.applyDeletes(kvs, scan.getTimeRange().getMin(), scan.getTimeRange().getMax()); rowFiltered = kvs.isEmpty(); } @Override public boolean filterRow() { return rowFiltered; } @Override public void write(final DataOutput out) throws IOException { // does nothing } @Override public void readFields(final DataInput in) throws IOException { // does nothing } }; if (scan.getFilter() == null) { scan.setFilter(deleteFilter); return scan; } FilterList wrappedFilter = new FilterList(Arrays.asList(deleteFilter, scan.getFilter())); scan.setFilter(wrappedFilter); return scan; }