/** * Purge a replica from the cache. * * This doesn't necessarily close the replica, since there may be * outstanding references to it. However, it does mean the cache won't * hand it out to anyone after this. * * You must hold the cache lock while calling this function. * * @param replica The replica being removed. */ private void purge(ShortCircuitReplica replica) { boolean removedFromInfoMap = false; String evictionMapName = null; Preconditions.checkArgument(!replica.purged); replica.purged = true; Waitable<ShortCircuitReplicaInfo> val = replicaInfoMap.get(replica.key); if (val != null) { ShortCircuitReplicaInfo info = val.getVal(); if ((info != null) && (info.getReplica() == replica)) { replicaInfoMap.remove(replica.key); removedFromInfoMap = true; } } Long evictableTimeNs = replica.getEvictableTimeNs(); if (evictableTimeNs != null) { evictionMapName = removeEvictable(replica); } if (LOG.isTraceEnabled()) { StringBuilder builder = new StringBuilder(); builder.append(this).append(": ").append(": purged "). append(replica).append(" from the cache."); if (removedFromInfoMap) { builder.append(" Removed from the replicaInfoMap."); } if (evictionMapName != null) { builder.append(" Removed from ").append(evictionMapName); } LOG.trace(builder.toString()); } unref(replica); }
/** * Fetch or create a replica. * * You must hold the cache lock while calling this function. * * @param key Key to use for lookup. * @param creator Replica creator callback. Will be called without * the cache lock being held. * * @return Null if no replica could be found or created. * The replica, otherwise. */ public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key, ShortCircuitReplicaCreator creator) { Waitable<ShortCircuitReplicaInfo> newWaitable = null; lock.lock(); try { ShortCircuitReplicaInfo info = null; do { if (closed) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": can't fetchOrCreate " + key + " because the cache is closed."); } return null; } Waitable<ShortCircuitReplicaInfo> waitable = replicaInfoMap.get(key); if (waitable != null) { try { info = fetch(key, waitable); } catch (RetriableException e) { if (LOG.isDebugEnabled()) { LOG.debug(this + ": retrying " + e.getMessage()); } continue; } } } while (false); if (info != null) return info; // We need to load the replica ourselves. newWaitable = new Waitable<ShortCircuitReplicaInfo>(lock.newCondition()); replicaInfoMap.put(key, newWaitable); } finally { lock.unlock(); } return create(key, creator, newWaitable); }
/** * Fetch or create a replica. * * You must hold the cache lock while calling this function. * * @param key Key to use for lookup. * @param creator Replica creator callback. Will be called without * the cache lock being held. * * @return Null if no replica could be found or created. * The replica, otherwise. */ public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key, ShortCircuitReplicaCreator creator) { Waitable<ShortCircuitReplicaInfo> newWaitable = null; lock.lock(); try { ShortCircuitReplicaInfo info = null; do { if (closed) { LOG.trace("{}: can't fethchOrCreate {} because the cache is closed.", this, key); return null; } Waitable<ShortCircuitReplicaInfo> waitable = replicaInfoMap.get(key); if (waitable != null) { try { info = fetch(key, waitable); } catch (RetriableException e) { LOG.debug("{}: retrying {}", this, e.getMessage()); } } } while (false); if (info != null) return info; // We need to load the replica ourselves. newWaitable = new Waitable<>(lock.newCondition()); replicaInfoMap.put(key, newWaitable); } finally { lock.unlock(); } return create(key, creator, newWaitable); }
private ShortCircuitReplicaInfo create(ExtendedBlockId key, ShortCircuitReplicaCreator creator, Waitable<ShortCircuitReplicaInfo> newWaitable) { // Handle loading a new replica. ShortCircuitReplicaInfo info = null; try { LOG.trace("{}: loading {}", this, key); info = creator.createShortCircuitReplicaInfo(); } catch (RuntimeException e) { LOG.warn(this + ": failed to load " + key, e); } if (info == null) info = new ShortCircuitReplicaInfo(); lock.lock(); try { if (info.getReplica() != null) { // On success, make sure the cache cleaner thread is running. LOG.trace("{}: successfully loaded {}", this, info.getReplica()); startCacheCleanerThreadIfNeeded(); // Note: new ShortCircuitReplicas start with a refCount of 2, // indicating that both this cache and whoever requested the // creation of the replica hold a reference. So we don't need // to increment the reference count here. } else { // On failure, remove the waitable from the replicaInfoMap. Waitable<ShortCircuitReplicaInfo> waitableInMap = replicaInfoMap.get(key); if (waitableInMap == newWaitable) replicaInfoMap.remove(key); if (info.getInvalidTokenException() != null) { LOG.info(this + ": could not load " + key + " due to InvalidToken " + "exception.", info.getInvalidTokenException()); } else { LOG.warn(this + ": failed to load " + key); } } newWaitable.provide(info); } finally { lock.unlock(); } return info; }
@VisibleForTesting // ONLY for testing public void accept(CacheVisitor visitor) { lock.lock(); try { Map<ExtendedBlockId, ShortCircuitReplica> replicas = new HashMap<>(); Map<ExtendedBlockId, InvalidToken> failedLoads = new HashMap<>(); for (Entry<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>> entry : replicaInfoMap.entrySet()) { Waitable<ShortCircuitReplicaInfo> waitable = entry.getValue(); if (waitable.hasVal()) { if (waitable.getVal().getReplica() != null) { replicas.put(entry.getKey(), waitable.getVal().getReplica()); } else { // The exception may be null here, indicating a failed load that // isn't the result of an invalid block token. failedLoads.put(entry.getKey(), waitable.getVal().getInvalidTokenException()); } } } LOG.debug("visiting {} with outstandingMmapCount={}, replicas={}, " + "failedLoads={}, evictable={}, evictableMmapped={}", visitor.getClass().getName(), outstandingMmapCount, replicas, failedLoads, evictable, evictableMmapped); visitor.visit(outstandingMmapCount, replicas, failedLoads, evictable, evictableMmapped); } finally { lock.unlock(); } }
/** * Fetch an existing ReplicaInfo object. * * @param key The key that we're using. * @param waitable The waitable object to wait on. * @return The existing ReplicaInfo object, or null if there is * none. * * @throws RetriableException If the caller needs to retry. */ private ShortCircuitReplicaInfo fetch(ExtendedBlockId key, Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException { // Another thread is already in the process of loading this // ShortCircuitReplica. So we simply wait for it to complete. ShortCircuitReplicaInfo info; try { if (LOG.isTraceEnabled()) { LOG.trace(this + ": found waitable for " + key); } info = waitable.await(); } catch (InterruptedException e) { LOG.info(this + ": interrupted while waiting for " + key); Thread.currentThread().interrupt(); throw new RetriableException("interrupted"); } if (info.getInvalidTokenException() != null) { LOG.info(this + ": could not get " + key + " due to InvalidToken " + "exception.", info.getInvalidTokenException()); return info; } ShortCircuitReplica replica = info.getReplica(); if (replica == null) { LOG.warn(this + ": failed to get " + key); return info; } if (replica.purged) { // Ignore replicas that have already been purged from the cache. throw new RetriableException("Ignoring purged replica " + replica + ". Retrying."); } // Check if the replica is stale before using it. // If it is, purge it and retry. if (replica.isStale()) { LOG.info(this + ": got stale replica " + replica + ". Removing " + "this replica from the replicaInfoMap and retrying."); // Remove the cache's reference to the replica. This may or may not // trigger a close. purge(replica); throw new RetriableException("ignoring stale replica " + replica); } ref(replica); return info; }
private ShortCircuitReplicaInfo create(ExtendedBlockId key, ShortCircuitReplicaCreator creator, Waitable<ShortCircuitReplicaInfo> newWaitable) { // Handle loading a new replica. ShortCircuitReplicaInfo info = null; try { if (LOG.isTraceEnabled()) { LOG.trace(this + ": loading " + key); } info = creator.createShortCircuitReplicaInfo(); } catch (RuntimeException e) { LOG.warn(this + ": failed to load " + key, e); } if (info == null) info = new ShortCircuitReplicaInfo(); lock.lock(); try { if (info.getReplica() != null) { // On success, make sure the cache cleaner thread is running. if (LOG.isTraceEnabled()) { LOG.trace(this + ": successfully loaded " + info.getReplica()); } startCacheCleanerThreadIfNeeded(); // Note: new ShortCircuitReplicas start with a refCount of 2, // indicating that both this cache and whoever requested the // creation of the replica hold a reference. So we don't need // to increment the reference count here. } else { // On failure, remove the waitable from the replicaInfoMap. Waitable<ShortCircuitReplicaInfo> waitableInMap = replicaInfoMap.get(key); if (waitableInMap == newWaitable) replicaInfoMap.remove(key); if (info.getInvalidTokenException() != null) { LOG.info(this + ": could not load " + key + " due to InvalidToken " + "exception.", info.getInvalidTokenException()); } else { LOG.warn(this + ": failed to load " + key); } } newWaitable.provide(info); } finally { lock.unlock(); } return info; }
/** * Fetch an existing ReplicaInfo object. * * @param key The key that we're using. * @param waitable The waitable object to wait on. * @return The existing ReplicaInfo object, or null if there is * none. * * @throws RetriableException If the caller needs to retry. */ private ShortCircuitReplicaInfo fetch(ExtendedBlockId key, Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException { // Another thread is already in the process of loading this // ShortCircuitReplica. So we simply wait for it to complete. ShortCircuitReplicaInfo info; try { LOG.trace("{}: found waitable for {}", this, key); info = waitable.await(); } catch (InterruptedException e) { LOG.info(this + ": interrupted while waiting for " + key); Thread.currentThread().interrupt(); throw new RetriableException("interrupted"); } if (info.getInvalidTokenException() != null) { LOG.info(this + ": could not get " + key + " due to InvalidToken " + "exception.", info.getInvalidTokenException()); return info; } ShortCircuitReplica replica = info.getReplica(); if (replica == null) { LOG.warn(this + ": failed to get " + key); return info; } if (replica.purged) { // Ignore replicas that have already been purged from the cache. throw new RetriableException("Ignoring purged replica " + replica + ". Retrying."); } // Check if the replica is stale before using it. // If it is, purge it and retry. if (replica.isStale()) { LOG.info(this + ": got stale replica " + replica + ". Removing " + "this replica from the replicaInfoMap and retrying."); // Remove the cache's reference to the replica. This may or may not // trigger a close. purge(replica); throw new RetriableException("ignoring stale replica " + replica); } ref(replica); return info; }
/** * Fetch an existing ReplicaInfo object. * * @param key The key that we're using. * @param waitable The waitable object to wait on. * @return The existing ReplicaInfo object, or null if there is * none. * * @throws RetriableException If the caller needs to retry. */ private ShortCircuitReplicaInfo fetch(ExtendedBlockId key, Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException { // Another thread is already in the process of loading this // ShortCircuitReplica. So we simply wait for it to complete. ShortCircuitReplicaInfo info; try { if (LOG.isTraceEnabled()) { LOG.trace(this + ": found waitable for " + key); } info = waitable.await(); } catch (InterruptedException e) { LOG.info(this + ": interrupted while waiting for " + key); Thread.currentThread().interrupt(); throw new RetriableException("interrupted"); } if (info.getInvalidTokenException() != null) { LOG.warn(this + ": could not get " + key + " due to InvalidToken " + "exception.", info.getInvalidTokenException()); return info; } ShortCircuitReplica replica = info.getReplica(); if (replica == null) { LOG.warn(this + ": failed to get " + key); return info; } if (replica.purged) { // Ignore replicas that have already been purged from the cache. throw new RetriableException("Ignoring purged replica " + replica + ". Retrying."); } // Check if the replica is stale before using it. // If it is, purge it and retry. if (replica.isStale()) { LOG.info(this + ": got stale replica " + replica + ". Removing " + "this replica from the replicaInfoMap and retrying."); // Remove the cache's reference to the replica. This may or may not // trigger a close. purge(replica); throw new RetriableException("ignoring stale replica " + replica); } ref(replica); return info; }
private ShortCircuitReplicaInfo create(ExtendedBlockId key, ShortCircuitReplicaCreator creator, Waitable<ShortCircuitReplicaInfo> newWaitable) { // Handle loading a new replica. ShortCircuitReplicaInfo info = null; try { if (LOG.isTraceEnabled()) { LOG.trace(this + ": loading " + key); } info = creator.createShortCircuitReplicaInfo(); } catch (RuntimeException e) { LOG.warn(this + ": failed to load " + key, e); } if (info == null) info = new ShortCircuitReplicaInfo(); lock.lock(); try { if (info.getReplica() != null) { // On success, make sure the cache cleaner thread is running. if (LOG.isTraceEnabled()) { LOG.trace(this + ": successfully loaded " + info.getReplica()); } startCacheCleanerThreadIfNeeded(); // Note: new ShortCircuitReplicas start with a refCount of 2, // indicating that both this cache and whoever requested the // creation of the replica hold a reference. So we don't need // to increment the reference count here. } else { // On failure, remove the waitable from the replicaInfoMap. Waitable<ShortCircuitReplicaInfo> waitableInMap = replicaInfoMap.get(key); if (waitableInMap == newWaitable) replicaInfoMap.remove(key); if (info.getInvalidTokenException() != null) { LOG.warn(this + ": could not load " + key + " due to InvalidToken " + "exception.", info.getInvalidTokenException()); } else { LOG.warn(this + ": failed to load " + key); } } newWaitable.provide(info); } finally { lock.unlock(); } return info; }