private BlockReader getBlockReaderLocal() throws InvalidToken { LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit " + " reads.", this); if (pathInfo == null) { pathInfo = clientContext.getDomainSocketFactory() .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); } if (!pathInfo.getPathState().getUsableForShortCircuit()) { PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " + "giving up on BlockReaderLocal.", this, pathInfo); return null; } ShortCircuitCache cache = clientContext.getShortCircuitCache(); ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); InvalidToken exc = info.getInvalidTokenException(); if (exc != null) { LOG.trace("{}: got InvalidToken exception while trying to construct " + "BlockReaderLocal via {}", this, pathInfo.getPath()); throw exc; } if (info.getReplica() == null) { PerformanceAdvisory.LOG.debug("{}: failed to get " + "ShortCircuitReplica. Cannot construct " + "BlockReaderLocal via {}", this, pathInfo.getPath()); return null; } return new BlockReaderLocal.Builder(conf.getShortCircuitConf()). setFilename(fileName). setBlock(block). setStartOffset(startOffset). setShortCircuitReplica(info.getReplica()). setVerifyChecksum(verifyChecksum). setCachingStrategy(cachingStrategy). setStorageType(storageType). setTracer(tracer). build(); }
private BlockReader getBlockReaderLocal() throws InvalidToken { if (LOG.isTraceEnabled()) { LOG.trace(this + ": trying to construct a BlockReaderLocal " + "for short-circuit reads."); } if (pathInfo == null) { pathInfo = clientContext.getDomainSocketFactory(). getPathInfo(inetSocketAddress, conf); } if (!pathInfo.getPathState().getUsableForShortCircuit()) { PerformanceAdvisory.LOG.debug(this + ": " + pathInfo + " is not " + "usable for short circuit; giving up on BlockReaderLocal."); return null; } ShortCircuitCache cache = clientContext.getShortCircuitCache(); ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); InvalidToken exc = info.getInvalidTokenException(); if (exc != null) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": got InvalidToken exception while trying to " + "construct BlockReaderLocal via " + pathInfo.getPath()); } throw exc; } if (info.getReplica() == null) { if (LOG.isTraceEnabled()) { PerformanceAdvisory.LOG.debug(this + ": failed to get " + "ShortCircuitReplica. Cannot construct " + "BlockReaderLocal via " + pathInfo.getPath()); } return null; } return new BlockReaderLocal.Builder(conf). setFilename(fileName). setBlock(block). setStartOffset(startOffset). setShortCircuitReplica(info.getReplica()). setVerifyChecksum(verifyChecksum). setCachingStrategy(cachingStrategy). setStorageType(storageType). build(); }
/** * Test the case where we have multiple threads waiting on the * ShortCircuitCache delivering a certain ShortCircuitReplica. * * In this case, there should only be one call to * createShortCircuitReplicaInfo. This one replica should be shared * by all threads. */ @Test(timeout=60000) public void testMultipleWaitersOnShortCircuitCache() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean creationIsBlocked = new AtomicBoolean(true); final AtomicBoolean testFailed = new AtomicBoolean(false); DFSInputStream.tcpReadsDisabledForTesting = true; BlockReaderFactory.createShortCircuitReplicaInfoCallback = new ShortCircuitCache.ShortCircuitReplicaCreator() { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { Uninterruptibles.awaitUninterruptibly(latch); if (!creationIsBlocked.compareAndSet(true, false)) { Assert.fail("there were multiple calls to " + "createShortCircuitReplicaInfo. Only one was expected."); } return null; } }; TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); Configuration conf = createShortCircuitConf( "testMultipleWaitersOnShortCircuitCache", sockDir); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); final DistributedFileSystem dfs = cluster.getFileSystem(); final String TEST_FILE = "/test_file"; final int TEST_FILE_LEN = 4000; final int SEED = 0xFADED; final int NUM_THREADS = 10; DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN, (short)1, SEED); Runnable readerRunnable = new Runnable() { @Override public void run() { try { byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE)); Assert.assertFalse(creationIsBlocked.get()); byte expected[] = DFSTestUtil. calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); } catch (Throwable e) { LOG.error("readerRunnable error", e); testFailed.set(true); } } }; Thread threads[] = new Thread[NUM_THREADS]; for (int i = 0; i < NUM_THREADS; i++) { threads[i] = new Thread(readerRunnable); threads[i].start(); } Thread.sleep(500); latch.countDown(); for (int i = 0; i < NUM_THREADS; i++) { Uninterruptibles.joinUninterruptibly(threads[i]); } cluster.shutdown(); sockDir.close(); Assert.assertFalse(testFailed.get()); }