/** * Disallow the scanner from scanning the given block pool. * * @param bpid The block pool id. */ public synchronized void disableBlockPoolId(String bpid) { Iterator<BlockIterator> i = blockIters.iterator(); while (i.hasNext()) { BlockIterator iter = i.next(); if (iter.getBlockPoolId().equals(bpid)) { LOG.trace("{}: disabling scanning on block pool {}", this, bpid); i.remove(); IOUtils.cleanup(null, iter); if (curBlockIter == iter) { curBlockIter = null; } notify(); return; } } LOG.warn("{}: can't remove block pool {}, because it was never " + "added.", this, bpid); }
private void saveBlockIterator(BlockIterator iter) { try { iter.save(); } catch (IOException e) { LOG.warn("{}: error saving {}.", this, iter, e); } }
/** * Find a usable block iterator.<p/> * * We will consider available block iterators in order. This property is * important so that we don't keep rescanning the same block pool id over * and over, while other block pools stay unscanned.<p/> * * A block pool is always ready to scan if the iterator is not at EOF. If * the iterator is at EOF, the block pool will be ready to scan when * conf.scanPeriodMs milliseconds have elapsed since the iterator was last * rewound.<p/> * * @return 0 if we found a usable block iterator; the * length of time we should delay before * checking again otherwise. */ private synchronized long findNextUsableBlockIter() { int numBlockIters = blockIters.size(); if (numBlockIters == 0) { LOG.debug("{}: no block pools are registered.", this); return Long.MAX_VALUE; } int curIdx; if (curBlockIter == null) { curIdx = 0; } else { curIdx = blockIters.indexOf(curBlockIter); Preconditions.checkState(curIdx >= 0); } // Note that this has to be wall-clock time, not monotonic time. This is // because the time saved in the cursor file is a wall-clock time. We do // not want to save a monotonic time in the cursor file, because it resets // every time the machine reboots (on most platforms). long nowMs = Time.now(); long minTimeoutMs = Long.MAX_VALUE; for (int i = 0; i < numBlockIters; i++) { int idx = (curIdx + i + 1) % numBlockIters; BlockIterator iter = blockIters.get(idx); if (!iter.atEnd()) { LOG.info("Now scanning bpid {} on volume {}", iter.getBlockPoolId(), volume.getBasePath()); curBlockIter = iter; return 0L; } long iterStartMs = iter.getIterStartMs(); long waitMs = (iterStartMs + conf.scanPeriodMs) - nowMs; if (waitMs <= 0) { iter.rewind(); LOG.info("Now rescanning bpid {} on volume {}, after more than " + "{} hour(s)", iter.getBlockPoolId(), volume.getBasePath(), TimeUnit.HOURS.convert(conf.scanPeriodMs, TimeUnit.MILLISECONDS)); curBlockIter = iter; return 0L; } minTimeoutMs = Math.min(minTimeoutMs, waitMs); } LOG.info("{}: no suitable block pools found to scan. Waiting {} ms.", this, minTimeoutMs); return minTimeoutMs; }