/** * Pull a slot out of a preexisting shared memory segment. * * Must be called with the manager lock held. * * @param blockId The blockId to put inside the Slot object. * * @return null if none of our shared memory segments contain a * free slot; the slot object otherwise. */ private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) { if (notFull.isEmpty()) { return null; } Entry<ShmId, DfsClientShm> entry = notFull.firstEntry(); DfsClientShm shm = entry.getValue(); ShmId shmId = shm.getShmId(); Slot slot = shm.allocAndRegisterSlot(blockId); if (shm.isFull()) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() + " out of " + shm); } DfsClientShm removedShm = notFull.remove(shmId); Preconditions.checkState(removedShm == shm); full.put(shmId, shm); } else { if (LOG.isTraceEnabled()) { LOG.trace(this + ": pulled slot " + slot.getSlotIdx() + " out of " + shm); } } return slot; }
public synchronized void unregisterSlot(SlotId slotId) throws InvalidRequestException { if (!enabled) { if (LOG.isTraceEnabled()) { LOG.trace("unregisterSlot: ShortCircuitRegistry is " + "not enabled."); } throw new UnsupportedOperationException(); } ShmId shmId = slotId.getShmId(); RegisteredShm shm = segments.get(shmId); if (shm == null) { throw new InvalidRequestException("there is no shared memory segment " + "registered with shmId " + shmId); } Slot slot = shm.getSlot(slotId.getSlotIdx()); slot.makeInvalid(); shm.unregisterSlot(slotId.getSlotIdx()); slots.remove(slot.getBlockId(), slot); }
/** * Pull a slot out of a preexisting shared memory segment. * * Must be called with the manager lock held. * * @param blockId The blockId to put inside the Slot object. * * @return null if none of our shared memory segments contain a * free slot; the slot object otherwise. */ private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) { if (notFull.isEmpty()) { return null; } Entry<ShmId, DfsClientShm> entry = notFull.firstEntry(); DfsClientShm shm = entry.getValue(); ShmId shmId = shm.getShmId(); Slot slot = shm.allocAndRegisterSlot(blockId); if (shm.isFull()) { LOG.trace("{}: pulled the last slot {} out of {}", this, slot.getSlotIdx(), shm); DfsClientShm removedShm = notFull.remove(shmId); Preconditions.checkState(removedShm == shm); full.put(shmId, shm); } else { LOG.trace("{}: pulled slot {} out of {}", this, slot.getSlotIdx(), shm); } return slot; }
/** * Handle a DFSClient request to create a new memory segment. * * @param clientName Client name as reported by the client. * @param sock The DomainSocket to associate with this memory * segment. When this socket is closed, or the * other side writes anything to the socket, the * segment will be closed. This can happen at any * time, including right after this function returns. * @return A NewShmInfo object. The caller must close the * NewShmInfo object once they are done with it. * @throws IOException If the new memory segment could not be created. */ public NewShmInfo createNewMemorySegment(String clientName, DomainSocket sock) throws IOException { NewShmInfo info = null; RegisteredShm shm = null; ShmId shmId = null; synchronized (this) { if (!enabled) { if (LOG.isTraceEnabled()) { LOG.trace("createNewMemorySegment: ShortCircuitRegistry is " + "not enabled."); } throw new UnsupportedOperationException(); } FileInputStream fis = null; try { do { shmId = ShmId.createRandom(); } while (segments.containsKey(shmId)); fis = shmFactory.createDescriptor(clientName, SHM_LENGTH); shm = new RegisteredShm(clientName, shmId, fis, this); } finally { if (shm == null) { IOUtils.closeQuietly(fis); } } info = new NewShmInfo(shmId, fis); segments.put(shmId, shm); } // Drop the registry lock to prevent deadlock. // After this point, RegisteredShm#handle may be called at any time. watcher.add(sock, shm); if (LOG.isTraceEnabled()) { LOG.trace("createNewMemorySegment: created " + info.shmId); } return info; }
public synchronized void registerSlot(ExtendedBlockId blockId, SlotId slotId, boolean isCached) throws InvalidRequestException { if (!enabled) { if (LOG.isTraceEnabled()) { LOG.trace(this + " can't register a slot because the " + "ShortCircuitRegistry is not enabled."); } throw new UnsupportedOperationException(); } ShmId shmId = slotId.getShmId(); RegisteredShm shm = segments.get(shmId); if (shm == null) { throw new InvalidRequestException("there is no shared memory segment " + "registered with shmId " + shmId); } Slot slot = shm.registerSlot(slotId.getSlotIdx(), blockId); if (isCached) { slot.makeAnchorable(); } else { slot.makeUnanchorable(); } boolean added = slots.put(blockId, slot); Preconditions.checkState(added); if (LOG.isTraceEnabled()) { LOG.trace(this + ": registered " + blockId + " with slot " + slotId + " (isCached=" + isCached + ")"); } }
public static ShortCircuitShmIdProto convert(ShmId shmId) { return ShortCircuitShmIdProto.newBuilder(). setHi(shmId.getHi()). setLo(shmId.getLo()). build(); }
@Test(timeout=60000) public void testStartupShutdown() throws Exception { File path = new File(TEST_BASE, "testStartupShutdown"); path.mkdirs(); SharedFileDescriptorFactory factory = SharedFileDescriptorFactory.create("shm_", new String[] { path.getAbsolutePath() } ); FileInputStream stream = factory.createDescriptor("testStartupShutdown", 4096); ShortCircuitShm shm = new ShortCircuitShm(ShmId.createRandom(), stream); shm.free(); stream.close(); FileUtil.fullyDelete(path); }
static private void checkNumberOfSegmentsAndSlots(final int expectedSegments, final int expectedSlots, ShortCircuitRegistry registry) { registry.visit(new ShortCircuitRegistry.Visitor() { @Override public void accept(HashMap<ShmId, RegisteredShm> segments, HashMultimap<ExtendedBlockId, Slot> slots) { Assert.assertEquals(expectedSegments, segments.size()); Assert.assertEquals(expectedSlots, slots.size()); } }); }
@Test(timeout=60000) public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testPreReceiptVerificationDfsClientCanDoScr", sockDir); conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); fs.getClient().getConf().brfFailureInjector = new TestPreReceiptVerificationFailureInjector(); final Path TEST_PATH1 = new Path("/test_file1"); DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE2); final Path TEST_PATH2 = new Path("/test_file2"); DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2); DFSTestUtil.readFileBuffer(fs, TEST_PATH1); DFSTestUtil.readFileBuffer(fs, TEST_PATH2); ShortCircuitRegistry registry = cluster.getDataNodes().get(0).getShortCircuitRegistry(); registry.visit(new ShortCircuitRegistry.Visitor() { @Override public void accept(HashMap<ShmId, RegisteredShm> segments, HashMultimap<ExtendedBlockId, Slot> slots) { Assert.assertEquals(1, segments.size()); Assert.assertEquals(2, slots.size()); } }); cluster.shutdown(); sockDir.close(); }
@Test(timeout=60000) public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception { BlockReaderTestUtil.enableShortCircuitShmTracing(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testPreReceiptVerificationDfsClientCanDoScr", sockDir); conf.setLong( HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY, 1000000000L); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); BlockReaderFactory.setFailureInjectorForTesting( new TestPreReceiptVerificationFailureInjector()); final Path TEST_PATH1 = new Path("/test_file1"); DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE2); final Path TEST_PATH2 = new Path("/test_file2"); DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2); DFSTestUtil.readFileBuffer(fs, TEST_PATH1); DFSTestUtil.readFileBuffer(fs, TEST_PATH2); ShortCircuitRegistry registry = cluster.getDataNodes().get(0).getShortCircuitRegistry(); registry.visit(new ShortCircuitRegistry.Visitor() { @Override public void accept(HashMap<ShmId, RegisteredShm> segments, HashMultimap<ExtendedBlockId, Slot> slots) { Assert.assertEquals(1, segments.size()); Assert.assertEquals(2, slots.size()); } }); cluster.shutdown(); sockDir.close(); }
/** * Stop tracking a slot. * * Must be called with the EndpointShmManager lock held. * * @param slot The slot to release. */ void freeSlot(Slot slot) { DfsClientShm shm = (DfsClientShm)slot.getShm(); shm.unregisterSlot(slot.getSlotIdx()); if (shm.isDisconnected()) { // Stale shared memory segments should not be tracked here. Preconditions.checkState(!full.containsKey(shm.getShmId())); Preconditions.checkState(!notFull.containsKey(shm.getShmId())); if (shm.isEmpty()) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": freeing empty stale " + shm); } shm.free(); } } else { ShmId shmId = shm.getShmId(); full.remove(shmId); // The shm can't be full if we just freed a slot. if (shm.isEmpty()) { notFull.remove(shmId); // If the shared memory segment is now empty, we call shutdown(2) on // the UNIX domain socket associated with it. The DomainSocketWatcher, // which is watching this socket, will call DfsClientShm#handle, // cleaning up this shared memory segment. // // See #{DfsClientShmManager#domainSocketWatcher} for details about why // we don't want to call DomainSocketWatcher#remove directly here. // // Note that we could experience 'fragmentation' here, where the // DFSClient allocates a bunch of slots in different shared memory // segments, and then frees most of them, but never fully empties out // any segment. We make some attempt to avoid this fragmentation by // always allocating new slots out of the shared memory segment with the // lowest ID, but it could still occur. In most workloads, // fragmentation should not be a major concern, since it doesn't impact // peak file descriptor usage or the speed of allocation. if (LOG.isTraceEnabled()) { LOG.trace(this + ": shutting down UNIX domain socket for " + "empty " + shm); } shutdown(shm); } else { notFull.put(shmId, shm); } } }
PerDatanodeVisitorInfo(TreeMap<ShmId, DfsClientShm> full, TreeMap<ShmId, DfsClientShm> notFull, boolean disabled) { this.full = full; this.notFull = notFull; this.disabled = disabled; }