/** * @param maxSize the target size of elements in the queue * @param blockSize expected average size of blocks */ public CachedEntryQueue(long maxSize, long blockSize) { int initialSize = (int) (maxSize / blockSize); if (initialSize == 0) { initialSize++; } queue = MinMaxPriorityQueue.orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>() { public int compare(Entry<BlockCacheKey, BucketEntry> entry1, Entry<BlockCacheKey, BucketEntry> entry2) { return BucketEntry.COMPARATOR.compare(entry1.getValue(), entry2.getValue()); } }).expectedSize(initialSize).create(); cacheSize = 0; this.maxSize = maxSize; }
/** * Attempt to add the specified entry to this queue. * <p> * If the queue is smaller than the max size, or if the specified element is * ordered after the smallest element in the queue, the element will be added * to the queue. Otherwise, there is no side effect of this call. * @param entry a bucket entry with key to try to add to the queue */ public void add(Map.Entry<BlockCacheKey, BucketEntry> entry) { if (cacheSize < maxSize) { queue.add(entry); cacheSize += entry.getValue().getLength(); } else { BucketEntry head = queue.peek().getValue(); if (BucketEntry.COMPARATOR.compare(entry.getValue(), head) > 0) { cacheSize += entry.getValue().getLength(); cacheSize -= head.getLength(); if (cacheSize > maxSize) { queue.poll(); } else { cacheSize += head.getLength(); } queue.add(entry); } } }
/** * Do Cache full exception * @throws IOException * @throws InterruptedException */ @Test (timeout=30000) public void testCacheFullException() throws IOException, InterruptedException { this.bc.cacheBlock(this.plainKey, plainCacheable); RAMQueueEntry rqe = q.remove(); RAMQueueEntry spiedRqe = Mockito.spy(rqe); final CacheFullException cfe = new CacheFullException(0, 0); BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class); Mockito.doThrow(cfe). doReturn(mockedBucketEntry). when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(), (UniqueIndexMap<Integer>)Mockito.any(), (AtomicLong)Mockito.any()); this.q.add(spiedRqe); doDrainOfOneEntry(bc, wt, q); }
/** * @param maxSize the target size of elements in the queue * @param blockSize expected average size of blocks */ public CachedEntryQueue(long maxSize, long blockSize) { int initialSize = (int) (maxSize / blockSize); if (initialSize == 0) initialSize++; queue = MinMaxPriorityQueue .orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>() { public int compare(Entry<BlockCacheKey, BucketEntry> entry1, Entry<BlockCacheKey, BucketEntry> entry2) { return entry1.getValue().compareTo(entry2.getValue()); } }).expectedSize(initialSize).create(); cacheSize = 0; this.maxSize = maxSize; }
/** * Attempt to add the specified entry to this queue. * * <p> * If the queue is smaller than the max size, or if the specified element is * ordered after the smallest element in the queue, the element will be added * to the queue. Otherwise, there is no side effect of this call. * @param entry a bucket entry with key to try to add to the queue */ public void add(Map.Entry<BlockCacheKey, BucketEntry> entry) { if (cacheSize < maxSize) { queue.add(entry); cacheSize += entry.getValue().getLength(); } else { BucketEntry head = queue.peek().getValue(); if (entry.getValue().compareTo(head) > 0) { cacheSize += entry.getValue().getLength(); cacheSize -= head.getLength(); if (cacheSize > maxSize) { queue.poll(); } else { cacheSize += head.getLength(); } queue.add(entry); } } }
/** * @param maxSize the target size of elements in the queue * @param blockSize expected average size of blocks */ public CachedEntryQueue(long maxSize, long blockSize) { int initialSize = (int) (maxSize / blockSize); if (initialSize == 0) { initialSize++; } queue = MinMaxPriorityQueue.orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>() { @Override public int compare(Entry<BlockCacheKey, BucketEntry> entry1, Entry<BlockCacheKey, BucketEntry> entry2) { return BucketEntry.COMPARATOR.compare(entry1.getValue(), entry2.getValue()); } }).expectedSize(initialSize).create(); cacheSize = 0; this.maxSize = maxSize; }
/** * Do Cache full exception * @throws IOException * @throws InterruptedException */ @Test (timeout=30000) public void testCacheFullException() throws IOException, InterruptedException { this.bc.cacheBlock(this.plainKey, plainCacheable); RAMQueueEntry rqe = q.remove(); RAMQueueEntry spiedRqe = Mockito.spy(rqe); final CacheFullException cfe = new CacheFullException(0, 0); BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class); Mockito.doThrow(cfe). doReturn(mockedBucketEntry). when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(), (UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any()); this.q.add(spiedRqe); doDrainOfOneEntry(bc, wt, q); }
/** * Rebuild the allocator's data structures from a persisted map. * @param availableSpace capacity of cache * @param map A map stores the block key and BucketEntry(block's meta data * like offset, length) * @param realCacheSize cached data size statistics for bucket cache * @throws BucketAllocatorException */ BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map, AtomicLong realCacheSize) throws BucketAllocatorException { this(availableSpace, bucketSizes); // each bucket has an offset, sizeindex. probably the buckets are too big // in our default state. so what we do is reconfigure them according to what // we've found. we can only reconfigure each bucket once; if more than once, // we know there's a bug, so we just log the info, throw, and start again... boolean[] reconfigured = new boolean[buckets.length]; for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) { long foundOffset = entry.getValue().offset(); int foundLen = entry.getValue().getLength(); int bucketSizeIndex = -1; for (int i = 0; i < bucketSizes.length; ++i) { if (foundLen <= bucketSizes[i]) { bucketSizeIndex = i; break; } } if (bucketSizeIndex == -1) { throw new BucketAllocatorException( "Can't match bucket size for the block with size " + foundLen); } int bucketNo = (int) (foundOffset / bucketCapacity); if (bucketNo < 0 || bucketNo >= buckets.length) throw new BucketAllocatorException("Can't find bucket " + bucketNo + ", total buckets=" + buckets.length + "; did you shrink the cache?"); Bucket b = buckets[bucketNo]; if (reconfigured[bucketNo]) { if (b.sizeIndex() != bucketSizeIndex) throw new BucketAllocatorException( "Inconsistent allocation in bucket map;"); } else { if (!b.isCompletelyFree()) throw new BucketAllocatorException("Reconfiguring bucket " + bucketNo + " but it's already allocated; corrupt data"); // Need to remove the bucket from whichever list it's currently in at // the moment... BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex]; BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()]; oldbsi.removeBucket(b); bsi.instantiateBucket(b); reconfigured[bucketNo] = true; } realCacheSize.addAndGet(foundLen); buckets[bucketNo].addAllocation(foundOffset); usedSize += buckets[bucketNo].getItemAllocationSize(); bucketSizeInfos[bucketSizeIndex].blockAllocated(b); } }
/** * Rebuild the allocator's data structures from a persisted map. * @param availableSpace capacity of cache * @param map A map stores the block key and BucketEntry(block's meta data * like offset, length) * @param realCacheSize cached data size statistics for bucket cache * @throws BucketAllocatorException */ BucketAllocator(long availableSpace, Map<BlockCacheKey, BucketEntry> map, AtomicLong realCacheSize) throws BucketAllocatorException { this(availableSpace); // each bucket has an offset, sizeindex. probably the buckets are too big // in our default state. so what we do is reconfigure them according to what // we've found. we can only reconfigure each bucket once; if more than once, // we know there's a bug, so we just log the info, throw, and start again... boolean[] reconfigured = new boolean[buckets.length]; for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) { long foundOffset = entry.getValue().offset(); int foundLen = entry.getValue().getLength(); int bucketSizeIndex = -1; for (int i = 0; i < BUCKET_SIZES.length; ++i) { if (foundLen <= BUCKET_SIZES[i]) { bucketSizeIndex = i; break; } } if (bucketSizeIndex == -1) { throw new BucketAllocatorException( "Can't match bucket size for the block with size " + foundLen); } int bucketNo = (int) (foundOffset / (long) BUCKET_CAPACITY); if (bucketNo < 0 || bucketNo >= buckets.length) throw new BucketAllocatorException("Can't find bucket " + bucketNo + ", total buckets=" + buckets.length + "; did you shrink the cache?"); Bucket b = buckets[bucketNo]; if (reconfigured[bucketNo] == true) { if (b.sizeIndex() != bucketSizeIndex) throw new BucketAllocatorException( "Inconsistent allocation in bucket map;"); } else { if (!b.isCompletelyFree()) throw new BucketAllocatorException("Reconfiguring bucket " + bucketNo + " but it's already allocated; corrupt data"); // Need to remove the bucket from whichever list it's currently in at // the moment... BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex]; BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()]; oldbsi.removeBucket(b); bsi.instantiateBucket(b); reconfigured[bucketNo] = true; } realCacheSize.addAndGet(foundLen); buckets[bucketNo].addAllocation(foundOffset); usedSize += buckets[bucketNo].itemAllocationSize(); bucketSizeInfos[bucketSizeIndex].blockAllocated(b); } }
/** * Rebuild the allocator's data structures from a persisted map. * @param availableSpace capacity of cache * @param map A map stores the block key and BucketEntry(block's meta data * like offset, length) * @param realCacheSize cached data size statistics for bucket cache * @throws BucketAllocatorException */ BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map, LongAdder realCacheSize) throws BucketAllocatorException { this(availableSpace, bucketSizes); // each bucket has an offset, sizeindex. probably the buckets are too big // in our default state. so what we do is reconfigure them according to what // we've found. we can only reconfigure each bucket once; if more than once, // we know there's a bug, so we just log the info, throw, and start again... boolean[] reconfigured = new boolean[buckets.length]; int sizeNotMatchedCount = 0; int insufficientCapacityCount = 0; Iterator<Map.Entry<BlockCacheKey, BucketEntry>> iterator = map.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<BlockCacheKey, BucketEntry> entry = iterator.next(); long foundOffset = entry.getValue().offset(); int foundLen = entry.getValue().getLength(); int bucketSizeIndex = -1; for (int i = 0; i < this.bucketSizes.length; ++i) { if (foundLen <= this.bucketSizes[i]) { bucketSizeIndex = i; break; } } if (bucketSizeIndex == -1) { sizeNotMatchedCount++; iterator.remove(); continue; } int bucketNo = (int) (foundOffset / bucketCapacity); if (bucketNo < 0 || bucketNo >= buckets.length) { insufficientCapacityCount++; iterator.remove(); continue; } Bucket b = buckets[bucketNo]; if (reconfigured[bucketNo]) { if (b.sizeIndex() != bucketSizeIndex) { throw new BucketAllocatorException("Inconsistent allocation in bucket map;"); } } else { if (!b.isCompletelyFree()) { throw new BucketAllocatorException( "Reconfiguring bucket " + bucketNo + " but it's already allocated; corrupt data"); } // Need to remove the bucket from whichever list it's currently in at // the moment... BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex]; BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()]; oldbsi.removeBucket(b); bsi.instantiateBucket(b); reconfigured[bucketNo] = true; } realCacheSize.add(foundLen); buckets[bucketNo].addAllocation(foundOffset); usedSize += buckets[bucketNo].getItemAllocationSize(); bucketSizeInfos[bucketSizeIndex].blockAllocated(b); } if (sizeNotMatchedCount > 0) { LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be rebuilt because " + "there is no matching bucket size for these blocks"); } if (insufficientCapacityCount > 0) { LOG.warn("There are " + insufficientCapacityCount + " blocks which can't be rebuilt - " + "did you shrink the cache?"); } }
/** * @return The next element in this queue, or {@code null} if the queue is * empty. */ public Map.Entry<BlockCacheKey, BucketEntry> poll() { return queue.poll(); }
/** * @return The last element in this queue, or {@code null} if the queue is * empty. */ public Map.Entry<BlockCacheKey, BucketEntry> pollLast() { return queue.pollLast(); }