/** * Cache the block to ramCache * @param cacheKey block's cache key * @param cachedItem block buffer * @param inMemory if block is in-memory * @param wait if true, blocking wait when queue is full */ public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait) { if (LOG.isTraceEnabled()) LOG.trace("Caching key=" + cacheKey + ", item=" + cachedItem); if (!cacheEnabled) { return; } if (backingMap.containsKey(cacheKey)) { Cacheable existingBlock = getBlock(cacheKey, false, false, false); try { if (BlockCacheUtil.compareCacheBlock(cachedItem, existingBlock) != 0) { throw new RuntimeException("Cached block contents differ, which should not have happened." + "cacheKey:" + cacheKey); } String msg = "Caching an already cached block: " + cacheKey; msg += ". This is harmless and can happen in rare cases (see HBASE-8547)"; LOG.warn(msg); } finally { // return the block since we need to decrement the count returnBlock(cacheKey, existingBlock); } return; } /* * Stuff the entry into the RAM cache so it can get drained to the persistent store */ RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); if (ramCache.putIfAbsent(cacheKey, re) != null) { return; } int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); boolean successfulAddition = false; if (wait) { try { successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { successfulAddition = bq.offer(re); } if (!successfulAddition) { ramCache.remove(cacheKey); cacheStats.failInsert(); } else { this.blockNumber.increment(); this.heapSize.add(cachedItem.heapSize()); blocksByHFile.add(cacheKey); } }