/** * Pull a slot out of a preexisting shared memory segment. * * Must be called with the manager lock held. * * @param blockId The blockId to put inside the Slot object. * * @return null if none of our shared memory segments contain a * free slot; the slot object otherwise. */ private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) { if (notFull.isEmpty()) { return null; } Entry<ShmId, DfsClientShm> entry = notFull.firstEntry(); DfsClientShm shm = entry.getValue(); ShmId shmId = shm.getShmId(); Slot slot = shm.allocAndRegisterSlot(blockId); if (shm.isFull()) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() + " out of " + shm); } DfsClientShm removedShm = notFull.remove(shmId); Preconditions.checkState(removedShm == shm); full.put(shmId, shm); } else { if (LOG.isTraceEnabled()) { LOG.trace(this + ": pulled slot " + slot.getSlotIdx() + " out of " + shm); } } return slot; }
public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer, MutableBoolean usedPeer, ExtendedBlockId blockId, String clientName) throws IOException { lock.lock(); try { if (closed) { LOG.trace(this + ": the DfsClientShmManager isclosed."); return null; } EndpointShmManager shmManager = datanodes.get(datanode); if (shmManager == null) { shmManager = new EndpointShmManager(datanode); datanodes.put(datanode, shmManager); } return shmManager.allocSlot(peer, usedPeer, clientName, blockId); } finally { lock.unlock(); } }
public ShortCircuitReplica(ExtendedBlockId key, FileInputStream dataStream, FileInputStream metaStream, ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException { this.key = key; this.dataStream = dataStream; this.metaStream = metaStream; this.metaHeader = BlockMetadataHeader.preadHeader(metaStream.getChannel()); if (metaHeader.getVersion() != 1) { throw new IOException("invalid metadata header version " + metaHeader.getVersion() + ". Can only handle version 1."); } this.cache = cache; this.creationTimeMs = creationTimeMs; this.slot = slot; }
public synchronized void removeShm(ShortCircuitShm shm) { if (LOG.isTraceEnabled()) { LOG.debug("removing shm " + shm); } // Stop tracking the shmId. RegisteredShm removedShm = segments.remove(shm.getShmId()); Preconditions.checkState(removedShm == shm, "failed to remove " + shm.getShmId()); // Stop tracking the slots. for (Iterator<Slot> iter = shm.slotIterator(); iter.hasNext(); ) { Slot slot = iter.next(); boolean removed = slots.remove(slot.getBlockId(), slot); Preconditions.checkState(removed); slot.makeInvalid(); } // De-allocate the memory map and close the shared file. shm.free(); }
/** * Invalidate any slot associated with a blockId that we are invalidating * (deleting) from this DataNode. When a slot is invalid, the DFSClient will * not use the corresponding replica for new read or mmap operations (although * existing, ongoing read or mmap operations will complete.) * * @param blockId The block ID. */ public synchronized void processBlockInvalidation(ExtendedBlockId blockId) { if (!enabled) return; final Set<Slot> affectedSlots = slots.get(blockId); if (!affectedSlots.isEmpty()) { final StringBuilder bld = new StringBuilder(); String prefix = ""; bld.append("Block ").append(blockId).append(" has been invalidated. "). append("Marking short-circuit slots as invalid: "); for (Slot slot : affectedSlots) { slot.makeInvalid(); bld.append(prefix).append(slot.toString()); prefix = ", "; } LOG.info(bld.toString()); } }
public synchronized void unregisterSlot(SlotId slotId) throws InvalidRequestException { if (!enabled) { if (LOG.isTraceEnabled()) { LOG.trace("unregisterSlot: ShortCircuitRegistry is " + "not enabled."); } throw new UnsupportedOperationException(); } ShmId shmId = slotId.getShmId(); RegisteredShm shm = segments.get(shmId); if (shm == null) { throw new InvalidRequestException("there is no shared memory segment " + "registered with shmId " + shmId); } Slot slot = shm.getSlot(slotId.getSlotIdx()); slot.makeInvalid(); shm.unregisterSlot(slotId.getSlotIdx()); slots.remove(slot.getBlockId(), slot); }
/** * Pull a slot out of a preexisting shared memory segment. * * Must be called with the manager lock held. * * @param blockId The blockId to put inside the Slot object. * * @return null if none of our shared memory segments contain a * free slot; the slot object otherwise. */ private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) { if (notFull.isEmpty()) { return null; } Entry<ShmId, DfsClientShm> entry = notFull.firstEntry(); DfsClientShm shm = entry.getValue(); ShmId shmId = shm.getShmId(); Slot slot = shm.allocAndRegisterSlot(blockId); if (shm.isFull()) { LOG.trace("{}: pulled the last slot {} out of {}", this, slot.getSlotIdx(), shm); DfsClientShm removedShm = notFull.remove(shmId); Preconditions.checkState(removedShm == shm); full.put(shmId, shm); } else { LOG.trace("{}: pulled slot {} out of {}", this, slot.getSlotIdx(), shm); } return slot; }
public void freeSlot(Slot slot) { lock.lock(); try { DfsClientShm shm = (DfsClientShm)slot.getShm(); shm.getEndpointShmManager().freeSlot(slot); } finally { lock.unlock(); } }
/** * Process a block mlock event from the FsDatasetCache. * * @param blockId The block that was mlocked. */ public synchronized void processBlockMlockEvent(ExtendedBlockId blockId) { if (!enabled) return; Set<Slot> affectedSlots = slots.get(blockId); for (Slot slot : affectedSlots) { slot.makeAnchorable(); } }
/** * Mark any slots associated with this blockId as unanchorable. * * @param blockId The block ID. * @return True if we should allow the munlock request. */ public synchronized boolean processBlockMunlockRequest( ExtendedBlockId blockId) { if (!enabled) return true; boolean allowMunlock = true; Set<Slot> affectedSlots = slots.get(blockId); for (Slot slot : affectedSlots) { slot.makeUnanchorable(); if (slot.isAnchored()) { allowMunlock = false; } } return allowMunlock; }
public synchronized String getClientNames(ExtendedBlockId blockId) { if (!enabled) return ""; final HashSet<String> clientNames = new HashSet<String>(); final Set<Slot> affectedSlots = slots.get(blockId); for (Slot slot : affectedSlots) { clientNames.add(((RegisteredShm)slot.getShm()).getClientName()); } return Joiner.on(",").join(clientNames); }
public synchronized void registerSlot(ExtendedBlockId blockId, SlotId slotId, boolean isCached) throws InvalidRequestException { if (!enabled) { if (LOG.isTraceEnabled()) { LOG.trace(this + " can't register a slot because the " + "ShortCircuitRegistry is not enabled."); } throw new UnsupportedOperationException(); } ShmId shmId = slotId.getShmId(); RegisteredShm shm = segments.get(shmId); if (shm == null) { throw new InvalidRequestException("there is no shared memory segment " + "registered with shmId " + shmId); } Slot slot = shm.registerSlot(slotId.getSlotIdx(), blockId); if (isCached) { slot.makeAnchorable(); } else { slot.makeUnanchorable(); } boolean added = slots.put(blockId, slot); Preconditions.checkState(added); if (LOG.isTraceEnabled()) { LOG.trace(this + ": registered " + blockId + " with slot " + slotId + " (isCached=" + isCached + ")"); } }
private void waitForReplicaAnchorStatus(final ShortCircuitCache cache, final ExtendedBlock block, final boolean expectedIsAnchorable, final boolean expectedIsAnchored, final int expectedOutstandingMmaps) throws Exception { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { final MutableBoolean result = new MutableBoolean(false); cache.accept(new CacheVisitor() { @Override public void visit(int numOutstandingMmaps, Map<ExtendedBlockId, ShortCircuitReplica> replicas, Map<ExtendedBlockId, InvalidToken> failedLoads, Map<Long, ShortCircuitReplica> evictable, Map<Long, ShortCircuitReplica> evictableMmapped) { Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps); ShortCircuitReplica replica = replicas.get(ExtendedBlockId.fromExtendedBlock(block)); Assert.assertNotNull(replica); Slot slot = replica.getSlot(); if ((expectedIsAnchorable != slot.isAnchorable()) || (expectedIsAnchored != slot.isAnchored())) { LOG.info("replica " + replica + " has isAnchorable = " + slot.isAnchorable() + ", isAnchored = " + slot.isAnchored() + ". Waiting for isAnchorable = " + expectedIsAnchorable + ", isAnchored = " + expectedIsAnchored); return; } result.setValue(true); } }); return result.toBoolean(); } }, 10, 60000); }
static private void checkNumberOfSegmentsAndSlots(final int expectedSegments, final int expectedSlots, ShortCircuitRegistry registry) { registry.visit(new ShortCircuitRegistry.Visitor() { @Override public void accept(HashMap<ShmId, RegisteredShm> segments, HashMultimap<ExtendedBlockId, Slot> slots) { Assert.assertEquals(expectedSegments, segments.size()); Assert.assertEquals(expectedSlots, slots.size()); } }); }
@Test(timeout=60000) public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testPreReceiptVerificationDfsClientCanDoScr", sockDir); conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); fs.getClient().getConf().brfFailureInjector = new TestPreReceiptVerificationFailureInjector(); final Path TEST_PATH1 = new Path("/test_file1"); DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE2); final Path TEST_PATH2 = new Path("/test_file2"); DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2); DFSTestUtil.readFileBuffer(fs, TEST_PATH1); DFSTestUtil.readFileBuffer(fs, TEST_PATH2); ShortCircuitRegistry registry = cluster.getDataNodes().get(0).getShortCircuitRegistry(); registry.visit(new ShortCircuitRegistry.Visitor() { @Override public void accept(HashMap<ShmId, RegisteredShm> segments, HashMultimap<ExtendedBlockId, Slot> slots) { Assert.assertEquals(1, segments.size()); Assert.assertEquals(2, slots.size()); } }); cluster.shutdown(); sockDir.close(); }