private ClientContext(String name, Conf conf) { this.name = name; this.confString = confAsString(conf); this.shortCircuitCache = new ShortCircuitCache( conf.shortCircuitStreamsCacheSize, conf.shortCircuitStreamsCacheExpiryMs, conf.shortCircuitMmapCacheSize, conf.shortCircuitMmapCacheExpiryMs, conf.shortCircuitMmapCacheRetryTimeout, conf.shortCircuitCacheStaleThresholdMs, conf.shortCircuitSharedMemoryWatcherInterruptCheckMs); this.peerCache = new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry); this.keyProviderCache = new KeyProviderCache(conf.keyProviderCacheExpiryMs); this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal; this.domainSocketFactory = new DomainSocketFactory(conf); this.byteArrayManager = ByteArrayManager.newInstance(conf.writeByteArrayManagerConf); }
private ClientContext(String name, DfsClientConf conf) { final ShortCircuitConf scConf = conf.getShortCircuitConf(); this.name = name; this.confString = scConf.confAsString(); this.shortCircuitCache = ShortCircuitCache.fromConf(scConf); this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(), scConf.getSocketCacheExpiry()); this.keyProviderCache = new KeyProviderCache( scConf.getKeyProviderCacheExpiryMs()); this.useLegacyBlockReaderLocal = scConf.isUseLegacyBlockReaderLocal(); this.domainSocketFactory = new DomainSocketFactory(scConf); this.byteArrayManager = ByteArrayManager.newInstance( conf.getWriteByteArrayManagerConf()); }
private ClientContext(String name, Conf conf) { this.name = name; this.confString = confAsString(conf); this.shortCircuitCache = new ShortCircuitCache( conf.shortCircuitStreamsCacheSize, conf.shortCircuitStreamsCacheExpiryMs, conf.shortCircuitMmapCacheSize, conf.shortCircuitMmapCacheExpiryMs, conf.shortCircuitMmapCacheRetryTimeout, conf.shortCircuitCacheStaleThresholdMs, conf.shortCircuitSharedMemoryWatcherInterruptCheckMs); this.peerCache = new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry); this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal; this.domainSocketFactory = new DomainSocketFactory(conf); this.byteArrayManager = ByteArrayManager.newInstance(conf.writeByteArrayManagerConf); }
private void waitForReplicaAnchorStatus(final ShortCircuitCache cache, final ExtendedBlock block, final boolean expectedIsAnchorable, final boolean expectedIsAnchored, final int expectedOutstandingMmaps) throws Exception { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { final MutableBoolean result = new MutableBoolean(false); cache.accept(new CacheVisitor() { @Override public void visit(int numOutstandingMmaps, Map<ExtendedBlockId, ShortCircuitReplica> replicas, Map<ExtendedBlockId, InvalidToken> failedLoads, Map<Long, ShortCircuitReplica> evictable, Map<Long, ShortCircuitReplica> evictableMmapped) { Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps); ShortCircuitReplica replica = replicas.get(ExtendedBlockId.fromExtendedBlock(block)); Assert.assertNotNull(replica); Slot slot = replica.getSlot(); if ((expectedIsAnchorable != slot.isAnchorable()) || (expectedIsAnchored != slot.isAnchored())) { LOG.info("replica " + replica + " has isAnchorable = " + slot.isAnchorable() + ", isAnchored = " + slot.isAnchored() + ". Waiting for isAnchorable = " + expectedIsAnchorable + ", isAnchored = " + expectedIsAnchored); return; } result.setValue(true); } }); return result.toBoolean(); } }, 10, 60000); }
/** * Test that a client which does not support short-circuit reads using * shared memory can talk with a server which supports it. */ @Test public void testShortCircuitReadFromClientWithoutShm() throws Exception { TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration clientConf = createShortCircuitConf( "testShortCircuitReadWithoutShm", sockDir); Configuration serverConf = new Configuration(clientConf); DFSInputStream.tcpReadsDisabledForTesting = true; final MiniDFSCluster cluster = new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); cluster.waitActive(); clientConf.setInt( DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0); clientConf.set(DFS_CLIENT_CONTEXT, "testShortCircuitReadFromClientWithoutShm_clientContext"); final DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf); final String TEST_FILE = "/test_file"; final int TEST_FILE_LEN = 4000; final int SEED = 0xFADEC; DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, (short)1, SEED); byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); byte expected[] = DFSTestUtil. calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); final ShortCircuitCache cache = fs.dfs.getClientContext().getShortCircuitCache(); Assert.assertEquals(null, cache.getDfsClientShmManager()); cluster.shutdown(); sockDir.close(); }
/** * Test shutting down the ShortCircuitCache while there are things in it. */ @Test public void testShortCircuitCacheShutdown() throws Exception { TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testShortCircuitCacheShutdown", sockDir); conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown"); Configuration serverConf = new Configuration(conf); DFSInputStream.tcpReadsDisabledForTesting = true; final MiniDFSCluster cluster = new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); cluster.waitActive(); final DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf); final String TEST_FILE = "/test_file"; final int TEST_FILE_LEN = 4000; final int SEED = 0xFADEC; DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, (short)1, SEED); byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); byte expected[] = DFSTestUtil. calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); final ShortCircuitCache cache = fs.dfs.getClientContext().getShortCircuitCache(); cache.close(); Assert.assertTrue(cache.getDfsClientShmManager(). getDomainSocketWatcher().isClosed()); cluster.shutdown(); sockDir.close(); }
public static void enableBlockReaderFactoryTracing() { LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel( Level.TRACE); LogManager.getLogger(ShortCircuitCache.class.getName()).setLevel( Level.TRACE); LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel( Level.TRACE); LogManager.getLogger(BlockReaderLocal.class.getName()).setLevel( Level.TRACE); }
private BlockReader getBlockReaderLocal() throws InvalidToken { LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit " + " reads.", this); if (pathInfo == null) { pathInfo = clientContext.getDomainSocketFactory() .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); } if (!pathInfo.getPathState().getUsableForShortCircuit()) { PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " + "giving up on BlockReaderLocal.", this, pathInfo); return null; } ShortCircuitCache cache = clientContext.getShortCircuitCache(); ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); InvalidToken exc = info.getInvalidTokenException(); if (exc != null) { LOG.trace("{}: got InvalidToken exception while trying to construct " + "BlockReaderLocal via {}", this, pathInfo.getPath()); throw exc; } if (info.getReplica() == null) { PerformanceAdvisory.LOG.debug("{}: failed to get " + "ShortCircuitReplica. Cannot construct " + "BlockReaderLocal via {}", this, pathInfo.getPath()); return null; } return new BlockReaderLocal.Builder(conf.getShortCircuitConf()). setFilename(fileName). setBlock(block). setStartOffset(startOffset). setShortCircuitReplica(info.getReplica()). setVerifyChecksum(verifyChecksum). setCachingStrategy(cachingStrategy). setStorageType(storageType). setTracer(tracer). build(); }
/** * Test that a client which does not support short-circuit reads using * shared memory can talk with a server which supports it. */ @Test public void testShortCircuitReadFromClientWithoutShm() throws Exception { TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration clientConf = createShortCircuitConf( "testShortCircuitReadWithoutShm", sockDir); Configuration serverConf = new Configuration(clientConf); DFSInputStream.tcpReadsDisabledForTesting = true; final MiniDFSCluster cluster = new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); cluster.waitActive(); clientConf.setInt( DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0); clientConf.set(DFS_CLIENT_CONTEXT, "testShortCircuitReadFromClientWithoutShm_clientContext"); final DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf); final String TEST_FILE = "/test_file"; final int TEST_FILE_LEN = 4000; final int SEED = 0xFADEC; DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, (short)1, SEED); byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); byte expected[] = DFSTestUtil. calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); final ShortCircuitCache cache = fs.dfs.getClientContext().getShortCircuitCache(); Assert.assertEquals(null, cache.getDfsClientShmManager()); cluster.shutdown(); }
/** * Test shutting down the ShortCircuitCache while there are things in it. */ @Test public void testShortCircuitCacheShutdown() throws Exception { TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testShortCircuitCacheShutdown", sockDir); conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown"); Configuration serverConf = new Configuration(conf); DFSInputStream.tcpReadsDisabledForTesting = true; final MiniDFSCluster cluster = new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); cluster.waitActive(); final DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf); final String TEST_FILE = "/test_file"; final int TEST_FILE_LEN = 4000; final int SEED = 0xFADEC; DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, (short)1, SEED); byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); byte expected[] = DFSTestUtil. calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); final ShortCircuitCache cache = fs.dfs.getClientContext().getShortCircuitCache(); cache.close(); Assert.assertTrue(cache.getDfsClientShmManager(). getDomainSocketWatcher().isClosed()); cluster.shutdown(); }
private BlockReader getBlockReaderLocal() throws InvalidToken { if (LOG.isTraceEnabled()) { LOG.trace(this + ": trying to construct a BlockReaderLocal " + "for short-circuit reads."); } if (pathInfo == null) { pathInfo = clientContext.getDomainSocketFactory(). getPathInfo(inetSocketAddress, conf); } if (!pathInfo.getPathState().getUsableForShortCircuit()) { PerformanceAdvisory.LOG.debug(this + ": " + pathInfo + " is not " + "usable for short circuit; giving up on BlockReaderLocal."); return null; } ShortCircuitCache cache = clientContext.getShortCircuitCache(); ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); InvalidToken exc = info.getInvalidTokenException(); if (exc != null) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": got InvalidToken exception while trying to " + "construct BlockReaderLocal via " + pathInfo.getPath()); } throw exc; } if (info.getReplica() == null) { if (LOG.isTraceEnabled()) { PerformanceAdvisory.LOG.debug(this + ": failed to get " + "ShortCircuitReplica. Cannot construct " + "BlockReaderLocal via " + pathInfo.getPath()); } return null; } return new BlockReaderLocal.Builder(conf). setFilename(fileName). setBlock(block). setStartOffset(startOffset). setShortCircuitReplica(info.getReplica()). setVerifyChecksum(verifyChecksum). setCachingStrategy(cachingStrategy). setStorageType(storageType). build(); }
public ShortCircuitCache getShortCircuitCache() { return shortCircuitCache; }
/** * Test the case where we have multiple threads waiting on the * ShortCircuitCache delivering a certain ShortCircuitReplica. * * In this case, there should only be one call to * createShortCircuitReplicaInfo. This one replica should be shared * by all threads. */ @Test(timeout=60000) public void testMultipleWaitersOnShortCircuitCache() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean creationIsBlocked = new AtomicBoolean(true); final AtomicBoolean testFailed = new AtomicBoolean(false); DFSInputStream.tcpReadsDisabledForTesting = true; BlockReaderFactory.createShortCircuitReplicaInfoCallback = new ShortCircuitCache.ShortCircuitReplicaCreator() { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { Uninterruptibles.awaitUninterruptibly(latch); if (!creationIsBlocked.compareAndSet(true, false)) { Assert.fail("there were multiple calls to " + "createShortCircuitReplicaInfo. Only one was expected."); } return null; } }; TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testMultipleWaitersOnShortCircuitCache", sockDir); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); final DistributedFileSystem dfs = cluster.getFileSystem(); final String TEST_FILE = "/test_file"; final int TEST_FILE_LEN = 4000; final int SEED = 0xFADED; final int NUM_THREADS = 10; DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN, (short)1, SEED); Runnable readerRunnable = new Runnable() { @Override public void run() { try { byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE)); Assert.assertFalse(creationIsBlocked.get()); byte expected[] = DFSTestUtil. calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); } catch (Throwable e) { LOG.error("readerRunnable error", e); testFailed.set(true); } } }; Thread threads[] = new Thread[NUM_THREADS]; for (int i = 0; i < NUM_THREADS; i++) { threads[i] = new Thread(readerRunnable); threads[i].start(); } Thread.sleep(500); latch.countDown(); for (int i = 0; i < NUM_THREADS; i++) { Uninterruptibles.joinUninterruptibly(threads[i]); } cluster.shutdown(); sockDir.close(); Assert.assertFalse(testFailed.get()); }
/** * Test that a client which supports short-circuit reads using * shared memory can fall back to not using shared memory when * the server doesn't support it. */ @Test public void testShortCircuitReadFromServerWithoutShm() throws Exception { TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration clientConf = createShortCircuitConf( "testShortCircuitReadFromServerWithoutShm", sockDir); Configuration serverConf = new Configuration(clientConf); serverConf.setInt( DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0); DFSInputStream.tcpReadsDisabledForTesting = true; final MiniDFSCluster cluster = new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); cluster.waitActive(); clientConf.set(DFS_CLIENT_CONTEXT, "testShortCircuitReadFromServerWithoutShm_clientContext"); final DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf); final String TEST_FILE = "/test_file"; final int TEST_FILE_LEN = 4000; final int SEED = 0xFADEC; DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, (short)1, SEED); byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); byte expected[] = DFSTestUtil. calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); final ShortCircuitCache cache = fs.dfs.getClientContext().getShortCircuitCache(); final DatanodeInfo datanode = new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId()); cache.getDfsClientShmManager().visit(new Visitor() { @Override public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info) throws IOException { Assert.assertEquals(1, info.size()); PerDatanodeVisitorInfo vinfo = info.get(datanode); Assert.assertTrue(vinfo.disabled); Assert.assertEquals(0, vinfo.full.size()); Assert.assertEquals(0, vinfo.notFull.size()); } }); cluster.shutdown(); sockDir.close(); }