/** * Transfers data from file to the given byte buffer * @param offset The offset in the file where the first byte to be read * @param length The length of buffer that should be allocated for reading * from the file channel * @return number of bytes read * @throws IOException */ @Override public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer) throws IOException { Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0."); ByteBuffer dstBuffer = ByteBuffer.allocate(length); if (length != 0) { accessFile(readAccessor, dstBuffer, offset); // The buffer created out of the fileChannel is formed by copying the data from the file // Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts // this buffer from the file the data is already copied and there is no need to ensure that // the results are not corrupted before consuming them. if (dstBuffer.limit() != length) { throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length + " expected"); } } return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE); }
@SuppressWarnings("unchecked") public static void setHFileDeserializer() { Field field = getProtectedField(HFileBlock.class, "blockDeserializer"); if (field == null){ LOG.error("Could not get access to HFileBlock.blockDeserializer"); return; } try { CacheableDeserializer<Cacheable> serde = (CacheableDeserializer<Cacheable>) field.get(null); if(serde != null){ deserializer.set(serde); } else{ LOG.warn("HFileBlock.blockDeserializer is null"); } } catch (Exception e) { LOG.warn("unable to read HFileBlock.blockDeserializer"); } }
@Override public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer) throws IOException { byte[] dst = new byte[length]; bufferArray.getMultiple(offset, length, dst); return deserializer.deserialize(new SingleByteBuff(ByteBuffer.wrap(dst)), true, MemoryType.EXCLUSIVE); }
@Override public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer) throws IOException { ByteBuff dstBuffer = bufferArray.asSubByteBuff(offset, length); // Here the buffer that is created directly refers to the buffer in the actual buckets. // When any cell is referring to the blocks created out of these buckets then it means that // those cells are referring to a shared memory area which if evicted by the BucketCache would // lead to corruption of results. Hence we set the type of the buffer as SHARED_MEMORY // so that the readers using this block are aware of this fact and do the necessary action // to prevent eviction till the results are either consumed or copied return deserializer.deserialize(dstBuffer, true, MemoryType.SHARED); }
@Override public void write(ByteBuffer buf, Cacheable obj) throws IOException { if( deserializer.get() == null){ CacheableDeserializer<Cacheable> des = obj.getDeserializer(); deserializer.compareAndSet(null, des); } // Serializer does not honor current buffer position int len = obj.getSerializedLength(); int pos = buf.position(); obj.serialize(buf); buf.limit(len + pos); buf.position(len+pos); }
/** * Get the buffer of the block with the specified key. * @param key block's cache key * @param caching true if the caller caches blocks on cache misses * @param repeat Whether this is a repeat lookup for the same block * @param updateCacheMetrics Whether we should update cache metrics or not * @return buffer of specified cache key, or null if not in cache */ @Override public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, boolean updateCacheMetrics) { if (!cacheEnabled) { return null; } RAMQueueEntry re = ramCache.get(key); if (re != null) { if (updateCacheMetrics) { cacheStats.hit(caching, key.isPrimary()); } re.access(accessCount.incrementAndGet()); return re.getData(); } BucketEntry bucketEntry = backingMap.get(key); if (bucketEntry != null) { long start = System.nanoTime(); ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); try { lock.readLock().lock(); // We can not read here even if backingMap does contain the given key because its offset // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check // existence here. if (bucketEntry.equals(backingMap.get(key))) { int len = bucketEntry.getLength(); ByteBuffer bb = ByteBuffer.allocate(len); int lenRead = ioEngine.read(bb, bucketEntry.offset()); if (lenRead != len) { throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected"); } CacheableDeserializer<Cacheable> deserializer = bucketEntry.deserializerReference(this.deserialiserMap); Cacheable cachedBlock = deserializer.deserialize(bb, true); long timeTaken = System.nanoTime() - start; if (updateCacheMetrics) { cacheStats.hit(caching, key.isPrimary()); cacheStats.ioHit(timeTaken); } bucketEntry.access(accessCount.incrementAndGet()); if (this.ioErrorStartTime > 0) { ioErrorStartTime = -1; } return cachedBlock; } } catch (IOException ioex) { LOG.error("Failed reading block " + key + " from bucket cache", ioex); checkIOErrorIsTolerated(); } finally { lock.readLock().unlock(); } } if (!repeat && updateCacheMetrics) { cacheStats.miss(caching, key.isPrimary()); } return null; }
protected CacheableDeserializer<Cacheable> deserializerReference( UniqueIndexMap<Integer> deserialiserMap) { return CacheableDeserializerIdManager.getDeserializer(deserialiserMap .unmap(deserialiserIndex)); }
protected void setDeserialiserReference( CacheableDeserializer<Cacheable> deserializer, UniqueIndexMap<Integer> deserialiserMap) { this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer .getDeserialiserIdentifier())); }
private CacheablePair(CacheableDeserializer<Cacheable> deserializer, ByteBuffer serializedData) { this.recentlyAccessed = new AtomicLong(); this.deserializer = deserializer; this.serializedData = serializedData; }
/** * Get the buffer of the block with the specified key. * @param key block's cache key * @param caching true if the caller caches blocks on cache misses * @param repeat Whether this is a repeat lookup for the same block * @param updateCacheMetrics Whether we should update cache metrics or not * @return buffer of specified cache key, or null if not in cache */ @Override public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, boolean updateCacheMetrics) { if (!cacheEnabled) return null; RAMQueueEntry re = ramCache.get(key); if (re != null) { if (updateCacheMetrics) cacheStats.hit(caching); re.access(accessCount.incrementAndGet()); return re.getData(); } BucketEntry bucketEntry = backingMap.get(key); if (bucketEntry != null) { long start = System.nanoTime(); IdLock.Entry lockEntry = null; try { lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); if (bucketEntry.equals(backingMap.get(key))) { int len = bucketEntry.getLength(); ByteBuffer bb = ByteBuffer.allocate(len); int lenRead = ioEngine.read(bb, bucketEntry.offset()); if (lenRead != len) { throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected"); } CacheableDeserializer<Cacheable> deserializer = bucketEntry.deserializerReference(this.deserialiserMap); Cacheable cachedBlock = deserializer.deserialize(bb, true); long timeTaken = System.nanoTime() - start; if (updateCacheMetrics) { cacheStats.hit(caching); cacheStats.ioHit(timeTaken); } bucketEntry.access(accessCount.incrementAndGet()); if (this.ioErrorStartTime > 0) { ioErrorStartTime = -1; } return cachedBlock; } } catch (IOException ioex) { LOG.error("Failed reading block " + key + " from bucket cache", ioex); checkIOErrorIsTolerated(); } finally { if (lockEntry != null) { offsetLock.releaseLockEntry(lockEntry); } } } if (!repeat && updateCacheMetrics) cacheStats.miss(caching); return null; }
public static CacheableDeserializer<Cacheable> getDeserializer() { return deserializer.get(); }
public static void setSerializer(CacheableDeserializer<Cacheable> ser) { deserializer.set(ser); }
@Override public CacheableDeserializer<Cacheable> getDeserializer() { return ByteArrayCacheable.deserializer; }
/** * Transfers data from IOEngine to a Cacheable object. * @param length How many bytes to be read from the offset * @param offset The offset in the IO engine where the first byte to be read * @param deserializer The deserializer to be used to make a Cacheable from the data. * @return Cacheable * @throws IOException * @throws RuntimeException when the length of the ByteBuff read is less than 'len' */ Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer) throws IOException;