/** * Pull a slot out of a preexisting shared memory segment. * * Must be called with the manager lock held. * * @param blockId The blockId to put inside the Slot object. * * @return null if none of our shared memory segments contain a * free slot; the slot object otherwise. */ private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) { if (notFull.isEmpty()) { return null; } Entry<ShmId, DfsClientShm> entry = notFull.firstEntry(); DfsClientShm shm = entry.getValue(); ShmId shmId = shm.getShmId(); Slot slot = shm.allocAndRegisterSlot(blockId); if (shm.isFull()) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() + " out of " + shm); } DfsClientShm removedShm = notFull.remove(shmId); Preconditions.checkState(removedShm == shm); full.put(shmId, shm); } else { if (LOG.isTraceEnabled()) { LOG.trace(this + ": pulled slot " + slot.getSlotIdx() + " out of " + shm); } } return slot; }
public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer, MutableBoolean usedPeer, ExtendedBlockId blockId, String clientName) throws IOException { lock.lock(); try { if (closed) { LOG.trace(this + ": the DfsClientShmManager isclosed."); return null; } EndpointShmManager shmManager = datanodes.get(datanode); if (shmManager == null) { shmManager = new EndpointShmManager(datanode); datanodes.put(datanode, shmManager); } return shmManager.allocSlot(peer, usedPeer, clientName, blockId); } finally { lock.unlock(); } }
/** * Allocate a new slot and register it. * * This function chooses an empty slot, initializes it, and then returns * the relevant Slot object. * * @return The new slot. */ synchronized public final Slot allocAndRegisterSlot( ExtendedBlockId blockId) { int idx = allocatedSlots.nextClearBit(0); if (idx >= slots.length) { throw new RuntimeException(this + ": no more slots are available."); } allocatedSlots.set(idx, true); Slot slot = new Slot(calculateSlotAddress(idx), blockId); slot.clear(); slot.makeValid(); slots[idx] = slot; if (LOG.isTraceEnabled()) { LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots + StringUtils.getStackTrace(Thread.currentThread())); } return slot; }
public ShortCircuitReplica(ExtendedBlockId key, FileInputStream dataStream, FileInputStream metaStream, ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException { this.key = key; this.dataStream = dataStream; this.metaStream = metaStream; this.metaHeader = BlockMetadataHeader.preadHeader(metaStream.getChannel()); if (metaHeader.getVersion() != 1) { throw new IOException("invalid metadata header version " + metaHeader.getVersion() + ". Can only handle version 1."); } this.cache = cache; this.creationTimeMs = creationTimeMs; this.slot = slot; }
/** * Attempt to begin caching a block. */ synchronized void cacheBlock(long blockId, String bpid, String blockFileName, long length, long genstamp, Executor volumeExecutor) { ExtendedBlockId key = new ExtendedBlockId(blockId, bpid); Value prevValue = mappableBlockMap.get(key); if (prevValue != null) { LOG.debug("Block with id {}, pool {} already exists in the " + "FsDatasetCache with state {}", blockId, bpid, prevValue.state ); numBlocksFailedToCache.incrementAndGet(); return; } mappableBlockMap.put(key, new Value(null, State.CACHING)); volumeExecutor.execute( new CachingTask(key, blockFileName, length, genstamp)); LOG.debug("Initiating caching for Block with id {}, pool {}", blockId, bpid); }
/** * Invalidate a block but does not delete the actual on-disk block file. * * It should only be used when deactivating disks. * * @param bpid the block pool ID. * @param block The block to be invalidated. */ public void invalidate(String bpid, ReplicaInfo block) { // If a DFSClient has the replica in its cache of short-circuit file // descriptors (and the client is using ShortCircuitShm), invalidate it. // The short-circuit registry is null in the unit tests, because the // datanode is mock object. if (datanode.getShortCircuitRegistry() != null) { datanode.getShortCircuitRegistry().processBlockInvalidation( new ExtendedBlockId(block.getBlockId(), bpid)); // If the block is cached, start uncaching it. cacheManager.uncacheBlock(bpid, block.getBlockId()); } datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block), block.getStorageUuid()); }
/** * Invalidate any slot associated with a blockId that we are invalidating * (deleting) from this DataNode. When a slot is invalid, the DFSClient will * not use the corresponding replica for new read or mmap operations (although * existing, ongoing read or mmap operations will complete.) * * @param blockId The block ID. */ public synchronized void processBlockInvalidation(ExtendedBlockId blockId) { if (!enabled) return; final Set<Slot> affectedSlots = slots.get(blockId); if (!affectedSlots.isEmpty()) { final StringBuilder bld = new StringBuilder(); String prefix = ""; bld.append("Block ").append(blockId).append(" has been invalidated. "). append("Marking short-circuit slots as invalid: "); for (Slot slot : affectedSlots) { slot.makeInvalid(); bld.append(prefix).append(slot.toString()); prefix = ", "; } LOG.info(bld.toString()); } }
@Override public void visit(int numOutstandingMmaps, Map<ExtendedBlockId, ShortCircuitReplica> replicas, Map<ExtendedBlockId, InvalidToken> failedLoads, Map<Long, ShortCircuitReplica> evictable, Map<Long, ShortCircuitReplica> evictableMmapped) { if (expectedNumOutstandingMmaps >= 0) { Assert.assertEquals(expectedNumOutstandingMmaps, numOutstandingMmaps); } if (expectedNumReplicas >= 0) { Assert.assertEquals(expectedNumReplicas, replicas.size()); } if (expectedNumEvictable >= 0) { Assert.assertEquals(expectedNumEvictable, evictable.size()); } if (expectedNumMmapedEvictable >= 0) { Assert.assertEquals(expectedNumMmapedEvictable, evictableMmapped.size()); } }
/** * Pull a slot out of a preexisting shared memory segment. * * Must be called with the manager lock held. * * @param blockId The blockId to put inside the Slot object. * * @return null if none of our shared memory segments contain a * free slot; the slot object otherwise. */ private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) { if (notFull.isEmpty()) { return null; } Entry<ShmId, DfsClientShm> entry = notFull.firstEntry(); DfsClientShm shm = entry.getValue(); ShmId shmId = shm.getShmId(); Slot slot = shm.allocAndRegisterSlot(blockId); if (shm.isFull()) { LOG.trace("{}: pulled the last slot {} out of {}", this, slot.getSlotIdx(), shm); DfsClientShm removedShm = notFull.remove(shmId); Preconditions.checkState(removedShm == shm); full.put(shmId, shm); } else { LOG.trace("{}: pulled slot {} out of {}", this, slot.getSlotIdx(), shm); } return slot; }