/** * Helper method that sets the cache size limit to be either a high, or a low limit. * If there is not enough free space to satisfy the high limit, it is set to the low limit. */ @GuardedBy("mLock") private void updateFileCacheSizeLimit() { // Test if mCacheSizeLimit can be set to the high limit boolean isAvailableSpaceLowerThanHighLimit; StatFsHelper.StorageType storageType = mStorage.isExternal() ? StatFsHelper.StorageType.EXTERNAL : StatFsHelper.StorageType.INTERNAL; isAvailableSpaceLowerThanHighLimit = mStatFsHelper.testLowDiskSpace( storageType, mDefaultCacheSizeLimit - mCacheStats.getSize()); if (isAvailableSpaceLowerThanHighLimit) { mCacheSizeLimit = mLowDiskSpaceCacheSizeLimit; } else { mCacheSizeLimit = mDefaultCacheSizeLimit; } }
/** * Removes an entry from within a table. All entries following the removed node can stay, but * all preceding ones need to be cloned. * * <p>This method does not decrement count for the removed entry, but does decrement count for * all partially collected entries which are skipped. As such callers which are modifying count * must re-read it after calling removeFromChain. * * @param first the first entry of the table * @param entry the entry being removed from the table * @return the new first entry for the table */ @GuardedBy("this") E removeFromChain(E first, E entry) { int newCount = count; E newFirst = entry.getNext(); for (E e = first; e != entry; e = e.getNext()) { E next = copyEntry(e, newFirst); if (next != null) { newFirst = next; } else { newCount--; } } this.count = newCount; return newFirst; }
/** * Create a new history proxy for a given shard. * * @throws InversibleLockException if the shard is being reconnected */ @GuardedBy("lock") private ProxyHistory createHistoryProxy(final Long shard) { final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard); final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(), identifier.getHistoryId(), shard); LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard); final ProxyHistory ret = createHistoryProxy(proxyId, connection); // Request creation of the history, if it is not the single history if (ret.getIdentifier().getHistoryId() != 0) { connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()), this::createHistoryCallback); } return ret; }
/** * Updates eviction metadata that {@code entry} was just written. This currently amounts to * adding {@code entry} to relevant eviction lists. */ @GuardedBy("this") void recordWrite(ReferenceEntry<K, V> entry, int weight, long now) { // we are already under lock, so drain the recency queue immediately drainRecencyQueue(); totalWeight += weight; if (map.recordsAccess()) { entry.setAccessTime(now); } if (map.recordsWrite()) { entry.setWriteTime(now); } accessQueue.add(entry); writeQueue.add(entry); }
/** * Process a ready transaction. The caller needs to ensure that * each transaction is seen only once by this method. * * @param tx Transaction which needs processing. */ @GuardedBy("this") private void processTransaction(@Nonnull final PingPongTransaction tx) { if (failed) { LOG.debug("Cancelling transaction {}", tx); tx.getTransaction().cancel(); return; } LOG.debug("Submitting transaction {}", tx); if (!INFLIGHT_UPDATER.compareAndSet(this, null, tx)) { LOG.warn("Submitting transaction {} while {} is still running", tx, inflightTx); } Futures.addCallback(tx.getTransaction().submit(), new FutureCallback<Void>() { @Override public void onSuccess(final Void result) { transactionSuccessful(tx, result); } @Override public void onFailure(final Throwable t) { transactionFailed(tx, t); } }, MoreExecutors.directExecutor()); }
/** * Performs eviction if the segment is over capacity. Avoids flushing the entire cache if the * newest entry exceeds the maximum weight all on its own. * * @param newest the most recently added entry */ @GuardedBy("this") void evictEntries(ReferenceEntry<K, V> newest) { if (!map.evictsBySize()) { return; } drainRecencyQueue(); // If the newest entry by itself is too heavy for the segment, don't bother evicting // anything else, just that if (newest.getValueReference().getWeight() > maxSegmentWeight) { if (!removeEntry(newest, newest.getHash(), RemovalCause.SIZE)) { throw new AssertionError(); } } while (totalWeight > maxSegmentWeight) { ReferenceEntry<K, V> e = getNextEvictable(); if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) { throw new AssertionError(); } } }
@GuardedBy("this") @Nullable ReferenceEntry<K, V> removeValueFromChain( ReferenceEntry<K, V> first, ReferenceEntry<K, V> entry, @Nullable K key, int hash, V value, ValueReference<K, V> valueReference, RemovalCause cause) { enqueueNotification(key, hash, value, valueReference.getWeight(), cause); writeQueue.remove(entry); accessQueue.remove(entry); if (valueReference.isLoading()) { valueReference.notifyNewValue(null); return first; } else { return removeEntryFromChain(first, entry); } }
/** * Records that the current thread is no longer waiting on the specified guard. */ @GuardedBy("lock") private void endWaitingFor(Guard guard) { int waiters = --guard.waiterCount; if (waiters == 0) { // unlink guard from activeGuards for (Guard p = activeGuards, pred = null; ; pred = p, p = p.next) { if (p == guard) { if (pred == null) { activeGuards = p.next; } else { pred.next = p.next; } p.next = null; // help GC break; } } } }
/** * Caller should check before calling that guard is not satisfied. */ @GuardedBy("lock") private boolean awaitNanos(Guard guard, long nanos, boolean signalBeforeWaiting) throws InterruptedException { boolean firstTime = true; try { do { if (nanos <= 0L) { return false; } if (firstTime) { if (signalBeforeWaiting) { signalNextWaiter(); } beginWaitingFor(guard); firstTime = false; } nanos = guard.condition.awaitNanos(nanos); } while (!guard.isSatisfied()); return true; } finally { if (!firstTime) { endWaitingFor(guard); } } }
@GuardedBy("this") boolean removeEntryForTesting(E entry) { int hash = entry.getHash(); int newCount = this.count - 1; AtomicReferenceArray<E> table = this.table; int index = hash & (table.length() - 1); E first = table.get(index); for (E e = first; e != null; e = e.getNext()) { if (e == entry) { ++modCount; E newFirst = removeFromChain(first, e); newCount = this.count - 1; table.set(index, newFirst); this.count = newCount; // write-volatile return true; } } return false; }
/** * Copies {@code original} into a new entry chained to {@code newNext}. Returns the new entry, * or {@code null} if {@code original} was already garbage collected. */ @GuardedBy("this") ReferenceEntry<K, V> copyEntry(ReferenceEntry<K, V> original, ReferenceEntry<K, V> newNext) { if (original.getKey() == null) { // key collected return null; } ValueReference<K, V> valueReference = original.getValueReference(); V value = valueReference.get(); if ((value == null) && valueReference.isActive()) { // value collected return null; } ReferenceEntry<K, V> newEntry = map.entryFactory.copyEntry(this, original, newNext); newEntry.setValueReference(valueReference.copyFor(this.valueReferenceQueue, value, newEntry)); return newEntry; }
/** * Schedule a timer to fire on the actor thread after a delay. * * @param delay Delay, in nanoseconds */ @GuardedBy("lock") private void scheduleTimer(final long delay) { if (haveTimer) { LOG.debug("{}: timer already scheduled on {}", context.persistenceId(), this); return; } if (queue.hasSuccessor()) { LOG.debug("{}: connection {} has a successor, not scheduling timer", context.persistenceId(), this); return; } // If the delay is negative, we need to schedule an action immediately. While the caller could have checked // for that condition and take appropriate action, but this is more convenient and less error-prone. final long normalized = delay <= 0 ? 0 : Math.min(delay, context.config().getBackendAlivenessTimerInterval()); final FiniteDuration dur = FiniteDuration.fromNanos(normalized); LOG.debug("{}: connection {} scheduling timeout in {}", context.persistenceId(), this, dur); context.executeInActor(this::runTimer, dur); haveTimer = true; }
@GuardedBy("this") void expireEntries(long now) { drainRecencyQueue(); ReferenceEntry<K, V> e; while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) { if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) { throw new AssertionError(); } } while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) { if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) { throw new AssertionError(); } } }
@GuardedBy("this") @Nullable ReferenceEntry<K, V> removeEntryFromChain( ReferenceEntry<K, V> first, ReferenceEntry<K, V> entry) { int newCount = count; ReferenceEntry<K, V> newFirst = entry.getNext(); for (ReferenceEntry<K, V> e = first; e != entry; e = e.getNext()) { ReferenceEntry<K, V> next = copyEntry(e, newFirst); if (next != null) { newFirst = next; } else { removeCollectedEntry(e); newCount--; } } this.count = newCount; return newFirst; }
@GuardedBy("monitor") private void terminated(final State from) { switch (from) { case NEW: TERMINATED_FROM_NEW_CALLBACK.enqueueOn(listeners); break; case RUNNING: TERMINATED_FROM_RUNNING_CALLBACK.enqueueOn(listeners); break; case STOPPING: TERMINATED_FROM_STOPPING_CALLBACK.enqueueOn(listeners); break; case STARTING: case TERMINATED: case FAILED: default: throw new AssertionError(); } }
/** * (Re)calculate the stats. * It is the callers responsibility to ensure thread-safety. * Assumes that it is called after initialization (or at the end of it). */ @GuardedBy("lock") private void updateStats() { mInternalStatFs = updateStatsHelper(mInternalStatFs, mInternalPath); mExternalStatFs = updateStatsHelper(mExternalStatFs, mExternalPath); mLastRestatTime = SystemClock.uptimeMillis(); }
@GuardedBy("mLock") private void checkState(UserCallback mode) { if (mInWhichUserCallback != mode) { throw new IllegalStateException( "Expected " + mode + ", but was " + mInWhichUserCallback); } }
@GuardedBy("mUrlRequestAdapterLock") private void startInternalLocked() { if (mRequestMetricsAccumulator != null) { mRequestMetricsAccumulator.onRequestStarted(); } nativeStart(mUrlRequestAdapter); }
@GuardedBy("mNativeStreamLock") private void destroyNativeStreamLocked(boolean sendOnCanceled) { Log.i(CronetUrlRequestContext.LOG_TAG, "destroyNativeStreamLocked " + this.toString()); if (mNativeStream == 0) { return; } nativeDestroy(mNativeStream, sendOnCanceled); mNativeStream = 0; mRequestContext.onRequestDestroyed(); if (mOnDestroyedCallbackForTesting != null) { mOnDestroyedCallbackForTesting.run(); } }
@GuardedBy("this") void drainKeyReferenceQueue(ReferenceQueue<K> keyReferenceQueue) { Reference<? extends K> ref; int i = 0; while ((ref = keyReferenceQueue.poll()) != null) { @SuppressWarnings("unchecked") E entry = (E) ref; map.reclaimKey(entry); if (++i == DRAIN_MAX) { break; } } }
@GuardedBy("this") void drainValueReferenceQueue(ReferenceQueue<V> valueReferenceQueue) { Reference<? extends V> ref; int i = 0; while ((ref = valueReferenceQueue.poll()) != null) { @SuppressWarnings("unchecked") WeakValueReference<K, V, E> valueReference = (WeakValueReference<K, V, E>) ref; map.reclaimValue(valueReference); if (++i == DRAIN_MAX) { break; } } }
/** * Sets a new value of an entry. Adds newly created entries at the end of the access queue. */ @GuardedBy("this") void setValue(ReferenceEntry<K, V> entry, K key, V value, long now) { ValueReference<K, V> previous = entry.getValueReference(); int weight = map.weigher.weigh(key, value); checkState(weight >= 0, "Weights must be non-negative"); ValueReference<K, V> valueReference = map.valueStrength.referenceValue(this, entry, value, weight); entry.setValueReference(valueReference); recordWrite(entry, weight, now); previous.notifyNewValue(value); }
/** * Drain the key and value reference queues, cleaning up internal entries containing garbage * collected keys or values. */ @GuardedBy("this") void drainReferenceQueues() { if (map.usesKeyReferences()) { drainKeyReferenceQueue(); } if (map.usesValueReferences()) { drainValueReferenceQueue(); } }
@GuardedBy("this") void drainKeyReferenceQueue() { Reference<? extends K> ref; int i = 0; while ((ref = keyReferenceQueue.poll()) != null) { @SuppressWarnings("unchecked") ReferenceEntry<K, V> entry = (ReferenceEntry<K, V>) ref; map.reclaimKey(entry); if (++i == DRAIN_MAX) { break; } } }
@GuardedBy("this") void drainValueReferenceQueue() { Reference<? extends V> ref; int i = 0; while ((ref = valueReferenceQueue.poll()) != null) { @SuppressWarnings("unchecked") ValueReference<K, V> valueReference = (ValueReference<K, V>) ref; map.reclaimValue(valueReference); if (++i == DRAIN_MAX) { break; } } }
/** * Updates the eviction metadata that {@code entry} was just read. This currently amounts to * adding {@code entry} to relevant eviction lists. * * <p>Note: this method should only be called under lock, as it directly manipulates the * eviction queues. Unlocked reads should use {@link #recordRead}. */ @GuardedBy("this") void recordLockedRead(ReferenceEntry<K, V> entry, long now) { if (map.recordsAccess()) { entry.setAccessTime(now); } accessQueue.add(entry); }
/** * Drains the recency queue, updating eviction metadata that the entries therein were read in * the specified relative order. This currently amounts to adding them to relevant eviction * lists (accounting for the fact that they could have been removed from the map since being * added to the recency queue). */ @GuardedBy("this") void drainRecencyQueue() { ReferenceEntry<K, V> e; while ((e = recencyQueue.poll()) != null) { // An entry may be in the recency queue despite it being removed from // the map . This can occur when the entry was concurrently read while a // writer is removing it from the segment or after a clear has removed // all of the segment's entries. if (accessQueue.contains(e)) { accessQueue.add(e); } } }
@GuardedBy("this") private boolean haveSubtree(final DOMDataTreeIdentifier subtree) { for (final DOMDataTreeIdentifier i : idToShard.keySet()) { if (i.contains(subtree)) { return true; } } return false; }
@GuardedBy("this") ReferenceEntry<K, V> getNextEvictable() { for (ReferenceEntry<K, V> e : accessQueue) { int weight = e.getValueReference().getWeight(); if (weight > 0) { return e; } } throw new AssertionError(); }
@GuardedBy("listenerLock") private void notifyListeners(DOMEntity entity, boolean wasOwner, boolean isOwner, boolean hasOwner, Collection<ListenerActorRefEntry> listenerEntries) { DOMEntityOwnershipChange changed = new DOMEntityOwnershipChange(entity, EntityOwnershipChangeState.from(wasOwner, isOwner, hasOwner), inJeopardy); for (ListenerActorRefEntry entry: listenerEntries) { ActorRef listenerActor = entry.actorFor(); LOG.debug("{}: Notifying EntityOwnershipListenerActor {} with {}", logId, listenerActor, changed); listenerActor.tell(changed, ActorRef.noSender()); } }
@GuardedBy("this") void removeCollectedEntry(ReferenceEntry<K, V> entry) { enqueueNotification( entry.getKey(), entry.getHash(), entry.getValueReference().get(), entry.getValueReference().getWeight(), RemovalCause.COLLECTED); writeQueue.remove(entry); accessQueue.remove(entry); }
@VisibleForTesting @GuardedBy("this") boolean removeEntry(ReferenceEntry<K, V> entry, int hash, RemovalCause cause) { int newCount = this.count - 1; AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table; int index = hash & (table.length() - 1); ReferenceEntry<K, V> first = table.get(index); for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) { if (e == entry) { ++modCount; ReferenceEntry<K, V> newFirst = removeValueFromChain( first, e, e.getKey(), hash, e.getValueReference().get(), e.getValueReference(), cause); newCount = this.count - 1; table.set(index, newFirst); this.count = newCount; // write-volatile return true; } } return false; }
/** Checks that the current state is equal to the expected state. */ @GuardedBy("monitor") private void checkCurrentState(State expected) { State actual = state(); if (actual != expected) { if (actual == FAILED) { // Handle this specially so that we can include the failureCause, if there is one. throw new IllegalStateException( "Expected the service " + this + " to be " + expected + ", but the service has FAILED", failureCause()); } throw new IllegalStateException( "Expected the service " + this + " to be " + expected + ", but was " + actual); } }
@Override @GuardedBy("ServiceManagerState.this.monitor") public boolean isSatisfied() { // All services have started or some service has terminated/failed. return states.count(RUNNING) == numberOfServices || states.contains(STOPPING) || states.contains(TERMINATED) || states.contains(FAILED); }
@GuardedBy("monitor") void checkHealthy() { if (states.count(RUNNING) != numberOfServices) { IllegalStateException exception = new IllegalStateException( "Expected to be healthy after starting. The following services are not running: " + Multimaps.filterKeys(servicesByState, not(equalTo(RUNNING)))); throw exception; } }
/** * Exactly like guard.isSatisfied(), but in addition signals all waiting threads in the (hopefully * unlikely) event that isSatisfied() throws. */ @GuardedBy("lock") private boolean isSatisfied(Guard guard) { try { return guard.isSatisfied(); } catch (Throwable throwable) { signalAllWaiters(); throw Throwables.propagate(throwable); } }