RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region, long nonceGroup, long nonce) throws IOException { this.region = region; this.maxResultSize = scan.getMaxResultSize(); if (scan.hasFilter()) { this.filter = new FilterWrapper(scan.getFilter()); } else { this.filter = null; } this.comparator = region.getCellComparator(); /** * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default * scanner context that can be used to enforce the batch limit in the event that a * ScannerContext is not specified during an invocation of next/nextRaw */ defaultScannerContext = ScannerContext.newBuilder() .setBatchLimit(scan.getBatch()).build(); this.stopRow = scan.getStopRow(); this.includeStopRow = scan.includeStopRow(); // synchronize on scannerReadPoints so that nobody calculates // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan); synchronized (scannerReadPoints) { if (mvccReadPoint > 0) { this.readPt = mvccReadPoint; } else if (nonce == HConstants.NO_NONCE || rsServices == null || rsServices.getNonceManager() == null) { this.readPt = getReadPoint(isolationLevel); } else { this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce); } scannerReadPoints.put(this, this.readPt); } initializeScanners(scan, additionalScanners); }
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region) throws IOException { this.region = region; this.maxResultSize = scan.getMaxResultSize(); if (scan.hasFilter()) { this.filter = new FilterWrapper(scan.getFilter()); } else { this.filter = null; } this.batch = scan.getBatch();//一次next所调用的最大数据量 if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) { this.stopRow = null; } else { this.stopRow = scan.getStopRow(); } // If we are doing a get, we want to be [startRow,endRow] normally // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; // synchronize on scannerReadPoints so that nobody calculates // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); synchronized (scannerReadPoints) { this.readPt = getReadpoint(isolationLevel); scannerReadPoints.put(this, this.readPt); } // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>(); if (additionalScanners != null) { scanners.addAll(additionalScanners); } for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { scanners.add(scanner); } else { joinedScanners.add(scanner); } } initializeKVHeap(scanners, joinedScanners, region); }
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region) throws IOException { this.region = region; this.maxResultSize = scan.getMaxResultSize(); if (scan.hasFilter()) { this.filter = new FilterWrapper(scan.getFilter()); } else { this.filter = null; } this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) { this.stopRow = null; } else { this.stopRow = scan.getStopRow(); } // If we are doing a get, we want to be [startRow,endRow] normally // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; // synchronize on scannerReadPoints so that nobody calculates // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); synchronized(scannerReadPoints) { this.readPt = getReadpoint(isolationLevel); scannerReadPoints.put(this, this.readPt); } // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>(); if (additionalScanners != null) { scanners.addAll(additionalScanners); } for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { scanners.add(scanner); } else { joinedScanners.add(scanner); } } initializeKVHeap(scanners, joinedScanners, region); }
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region) throws IOException { this.region = region; this.maxResultSize = scan.getMaxResultSize(); if (scan.hasFilter()) { this.filter = new FilterWrapper(scan.getFilter()); } else { this.filter = null; } this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) { this.stopRow = null; } else { this.stopRow = scan.getStopRow(); } // If we are doing a get, we want to be [startRow,endRow] normally // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; // synchronize on scannerReadPoints so that nobody calculates // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); synchronized(scannerReadPoints) { if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { // This scan can read even uncommitted transactions this.readPt = Long.MAX_VALUE; MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); } else { this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); } scannerReadPoints.put(this, this.readPt); } // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>(); if (additionalScanners != null) { scanners.addAll(additionalScanners); } for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); KeyValueScanner scanner = store.getScanner(scan, entry.getValue()); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { scanners.add(scanner); } else { joinedScanners.add(scanner); } } this.storeHeap = new KeyValueHeap(scanners, comparator); if (!joinedScanners.isEmpty()) { this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); } }
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException { //DebugPrint.println("HRegionScanner.<init>"); this.maxResultSize = scan.getMaxResultSize(); if (scan.hasFilter()) { this.filter = new FilterWrapper(scan.getFilter()); } else { this.filter = null; } this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { this.stopRow = null; } else { this.stopRow = scan.getStopRow(); } // If we are doing a get, we want to be [startRow,endRow] normally // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; // synchronize on scannerReadPoints so that nobody calculates // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); synchronized(scannerReadPoints) { if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { // This scan can read even uncommitted transactions this.readPt = Long.MAX_VALUE; MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); } else { this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); } scannerReadPoints.put(this, this.readPt); } // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>(); if (additionalScanners != null) { scanners.addAll(additionalScanners); } for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); KeyValueScanner scanner = store.getScanner(scan, entry.getValue()); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { scanners.add(scanner); } else { joinedScanners.add(scanner); } } this.storeHeap = new KeyValueHeap(scanners, comparator); if (!joinedScanners.isEmpty()) { this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); } }