Java 类org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache 实例源码

项目:hadoop    文件:ClientContext.java   
private ClientContext(String name, Conf conf) {
  this.name = name;
  this.confString = confAsString(conf);
  this.shortCircuitCache = new ShortCircuitCache(
      conf.shortCircuitStreamsCacheSize,
      conf.shortCircuitStreamsCacheExpiryMs,
      conf.shortCircuitMmapCacheSize,
      conf.shortCircuitMmapCacheExpiryMs,
      conf.shortCircuitMmapCacheRetryTimeout,
      conf.shortCircuitCacheStaleThresholdMs,
      conf.shortCircuitSharedMemoryWatcherInterruptCheckMs);
  this.peerCache =
        new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry);
  this.keyProviderCache = new KeyProviderCache(conf.keyProviderCacheExpiryMs);
  this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal;
  this.domainSocketFactory = new DomainSocketFactory(conf);

  this.byteArrayManager = ByteArrayManager.newInstance(conf.writeByteArrayManagerConf);
}
项目:aliyun-oss-hadoop-fs    文件:ClientContext.java   
private ClientContext(String name, DfsClientConf conf) {
  final ShortCircuitConf scConf = conf.getShortCircuitConf();

  this.name = name;
  this.confString = scConf.confAsString();
  this.shortCircuitCache = ShortCircuitCache.fromConf(scConf);
  this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
      scConf.getSocketCacheExpiry());
  this.keyProviderCache = new KeyProviderCache(
      scConf.getKeyProviderCacheExpiryMs());
  this.useLegacyBlockReaderLocal = scConf.isUseLegacyBlockReaderLocal();
  this.domainSocketFactory = new DomainSocketFactory(scConf);

  this.byteArrayManager = ByteArrayManager.newInstance(
      conf.getWriteByteArrayManagerConf());
}
项目:big-c    文件:ClientContext.java   
private ClientContext(String name, Conf conf) {
  this.name = name;
  this.confString = confAsString(conf);
  this.shortCircuitCache = new ShortCircuitCache(
      conf.shortCircuitStreamsCacheSize,
      conf.shortCircuitStreamsCacheExpiryMs,
      conf.shortCircuitMmapCacheSize,
      conf.shortCircuitMmapCacheExpiryMs,
      conf.shortCircuitMmapCacheRetryTimeout,
      conf.shortCircuitCacheStaleThresholdMs,
      conf.shortCircuitSharedMemoryWatcherInterruptCheckMs);
  this.peerCache =
        new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry);
  this.keyProviderCache = new KeyProviderCache(conf.keyProviderCacheExpiryMs);
  this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal;
  this.domainSocketFactory = new DomainSocketFactory(conf);

  this.byteArrayManager = ByteArrayManager.newInstance(conf.writeByteArrayManagerConf);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:ClientContext.java   
private ClientContext(String name, Conf conf) {
  this.name = name;
  this.confString = confAsString(conf);
  this.shortCircuitCache = new ShortCircuitCache(
      conf.shortCircuitStreamsCacheSize,
      conf.shortCircuitStreamsCacheExpiryMs,
      conf.shortCircuitMmapCacheSize,
      conf.shortCircuitMmapCacheExpiryMs,
      conf.shortCircuitMmapCacheRetryTimeout,
      conf.shortCircuitCacheStaleThresholdMs,
      conf.shortCircuitSharedMemoryWatcherInterruptCheckMs);
  this.peerCache =
        new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry);
  this.keyProviderCache = new KeyProviderCache(conf.keyProviderCacheExpiryMs);
  this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal;
  this.domainSocketFactory = new DomainSocketFactory(conf);

  this.byteArrayManager = ByteArrayManager.newInstance(conf.writeByteArrayManagerConf);
}
项目:FlexMap    文件:ClientContext.java   
private ClientContext(String name, Conf conf) {
  this.name = name;
  this.confString = confAsString(conf);
  this.shortCircuitCache = new ShortCircuitCache(
      conf.shortCircuitStreamsCacheSize,
      conf.shortCircuitStreamsCacheExpiryMs,
      conf.shortCircuitMmapCacheSize,
      conf.shortCircuitMmapCacheExpiryMs,
      conf.shortCircuitMmapCacheRetryTimeout,
      conf.shortCircuitCacheStaleThresholdMs,
      conf.shortCircuitSharedMemoryWatcherInterruptCheckMs);
  this.peerCache =
        new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry);
  this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal;
  this.domainSocketFactory = new DomainSocketFactory(conf);

  this.byteArrayManager = ByteArrayManager.newInstance(conf.writeByteArrayManagerConf);
}
项目:hadoop    文件:TestEnhancedByteBufferAccess.java   
private void waitForReplicaAnchorStatus(final ShortCircuitCache cache,
    final ExtendedBlock block, final boolean expectedIsAnchorable,
      final boolean expectedIsAnchored, final int expectedOutstandingMmaps)
        throws Exception {
  GenericTestUtils.waitFor(new Supplier<Boolean>() {
    @Override
    public Boolean get() {
      final MutableBoolean result = new MutableBoolean(false);
      cache.accept(new CacheVisitor() {
        @Override
        public void visit(int numOutstandingMmaps,
            Map<ExtendedBlockId, ShortCircuitReplica> replicas,
            Map<ExtendedBlockId, InvalidToken> failedLoads,
            Map<Long, ShortCircuitReplica> evictable,
            Map<Long, ShortCircuitReplica> evictableMmapped) {
          Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps);
          ShortCircuitReplica replica =
              replicas.get(ExtendedBlockId.fromExtendedBlock(block));
          Assert.assertNotNull(replica);
          Slot slot = replica.getSlot();
          if ((expectedIsAnchorable != slot.isAnchorable()) ||
              (expectedIsAnchored != slot.isAnchored())) {
            LOG.info("replica " + replica + " has isAnchorable = " +
              slot.isAnchorable() + ", isAnchored = " + slot.isAnchored() + 
              ".  Waiting for isAnchorable = " + expectedIsAnchorable + 
              ", isAnchored = " + expectedIsAnchored);
            return;
          }
          result.setValue(true);
        }
      });
      return result.toBoolean();
    }
  }, 10, 60000);
}
项目:hadoop    文件:TestBlockReaderFactory.java   
/**
 * Test that a client which does not support short-circuit reads using
 * shared memory can talk with a server which supports it.
 */
@Test
public void testShortCircuitReadFromClientWithoutShm() throws Exception {
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration clientConf = createShortCircuitConf(
      "testShortCircuitReadWithoutShm", sockDir);
  Configuration serverConf = new Configuration(clientConf);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
  cluster.waitActive();
  clientConf.setInt(
      DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
  clientConf.set(DFS_CLIENT_CONTEXT,
      "testShortCircuitReadFromClientWithoutShm_clientContext");
  final DistributedFileSystem fs =
      (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADEC;
  DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
  byte expected[] = DFSTestUtil.
      calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
  Assert.assertTrue(Arrays.equals(contents, expected));
  final ShortCircuitCache cache =
      fs.dfs.getClientContext().getShortCircuitCache();
  Assert.assertEquals(null, cache.getDfsClientShmManager());
  cluster.shutdown();
  sockDir.close();
}
项目:hadoop    文件:TestBlockReaderFactory.java   
/**
 * Test shutting down the ShortCircuitCache while there are things in it.
 */
@Test
public void testShortCircuitCacheShutdown() throws Exception {
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration conf = createShortCircuitConf(
      "testShortCircuitCacheShutdown", sockDir);
  conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown");
  Configuration serverConf = new Configuration(conf);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
  cluster.waitActive();
  final DistributedFileSystem fs =
      (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADEC;
  DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
  byte expected[] = DFSTestUtil.
      calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
  Assert.assertTrue(Arrays.equals(contents, expected));
  final ShortCircuitCache cache =
      fs.dfs.getClientContext().getShortCircuitCache();
  cache.close();
  Assert.assertTrue(cache.getDfsClientShmManager().
      getDomainSocketWatcher().isClosed());
  cluster.shutdown();
  sockDir.close();
}
项目:hadoop    文件:BlockReaderTestUtil.java   
public static void enableBlockReaderFactoryTracing() {
  LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel(
      Level.TRACE);
  LogManager.getLogger(ShortCircuitCache.class.getName()).setLevel(
      Level.TRACE);
  LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel(
      Level.TRACE);
  LogManager.getLogger(BlockReaderLocal.class.getName()).setLevel(
      Level.TRACE);
}
项目:aliyun-oss-hadoop-fs    文件:BlockReaderFactory.java   
private BlockReader getBlockReaderLocal() throws InvalidToken {
  LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit "
      + " reads.", this);
  if (pathInfo == null) {
    pathInfo = clientContext.getDomainSocketFactory()
        .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
  }
  if (!pathInfo.getPathState().getUsableForShortCircuit()) {
    PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " +
            "giving up on BlockReaderLocal.", this, pathInfo);
    return null;
  }
  ShortCircuitCache cache = clientContext.getShortCircuitCache();
  ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
      block.getBlockPoolId());
  ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
  InvalidToken exc = info.getInvalidTokenException();
  if (exc != null) {
    LOG.trace("{}: got InvalidToken exception while trying to construct "
        + "BlockReaderLocal via {}", this, pathInfo.getPath());
    throw exc;
  }
  if (info.getReplica() == null) {
    PerformanceAdvisory.LOG.debug("{}: failed to get " +
        "ShortCircuitReplica. Cannot construct " +
        "BlockReaderLocal via {}", this, pathInfo.getPath());
    return null;
  }
  return new BlockReaderLocal.Builder(conf.getShortCircuitConf()).
      setFilename(fileName).
      setBlock(block).
      setStartOffset(startOffset).
      setShortCircuitReplica(info.getReplica()).
      setVerifyChecksum(verifyChecksum).
      setCachingStrategy(cachingStrategy).
      setStorageType(storageType).
      setTracer(tracer).
      build();
}
项目:aliyun-oss-hadoop-fs    文件:TestEnhancedByteBufferAccess.java   
private void waitForReplicaAnchorStatus(final ShortCircuitCache cache,
    final ExtendedBlock block, final boolean expectedIsAnchorable,
      final boolean expectedIsAnchored, final int expectedOutstandingMmaps)
        throws Exception {
  GenericTestUtils.waitFor(new Supplier<Boolean>() {
    @Override
    public Boolean get() {
      final MutableBoolean result = new MutableBoolean(false);
      cache.accept(new CacheVisitor() {
        @Override
        public void visit(int numOutstandingMmaps,
            Map<ExtendedBlockId, ShortCircuitReplica> replicas,
            Map<ExtendedBlockId, InvalidToken> failedLoads,
            Map<Long, ShortCircuitReplica> evictable,
            Map<Long, ShortCircuitReplica> evictableMmapped) {
          Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps);
          ShortCircuitReplica replica =
              replicas.get(ExtendedBlockId.fromExtendedBlock(block));
          Assert.assertNotNull(replica);
          Slot slot = replica.getSlot();
          if ((expectedIsAnchorable != slot.isAnchorable()) ||
              (expectedIsAnchored != slot.isAnchored())) {
            LOG.info("replica " + replica + " has isAnchorable = " +
              slot.isAnchorable() + ", isAnchored = " + slot.isAnchored() + 
              ".  Waiting for isAnchorable = " + expectedIsAnchorable + 
              ", isAnchored = " + expectedIsAnchored);
            return;
          }
          result.setValue(true);
        }
      });
      return result.toBoolean();
    }
  }, 10, 60000);
}
项目:aliyun-oss-hadoop-fs    文件:TestBlockReaderFactory.java   
/**
 * Test that a client which does not support short-circuit reads using
 * shared memory can talk with a server which supports it.
 */
@Test
public void testShortCircuitReadFromClientWithoutShm() throws Exception {
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration clientConf = createShortCircuitConf(
      "testShortCircuitReadWithoutShm", sockDir);
  Configuration serverConf = new Configuration(clientConf);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
  cluster.waitActive();
  clientConf.setInt(
      DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
  clientConf.set(DFS_CLIENT_CONTEXT,
      "testShortCircuitReadFromClientWithoutShm_clientContext");
  final DistributedFileSystem fs =
      (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADEC;
  DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
  byte expected[] = DFSTestUtil.
      calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
  Assert.assertTrue(Arrays.equals(contents, expected));
  final ShortCircuitCache cache =
      fs.dfs.getClientContext().getShortCircuitCache();
  Assert.assertEquals(null, cache.getDfsClientShmManager());
  cluster.shutdown();
  sockDir.close();
}
项目:aliyun-oss-hadoop-fs    文件:TestBlockReaderFactory.java   
/**
 * Test shutting down the ShortCircuitCache while there are things in it.
 */
@Test
public void testShortCircuitCacheShutdown() throws Exception {
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration conf = createShortCircuitConf(
      "testShortCircuitCacheShutdown", sockDir);
  conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown");
  Configuration serverConf = new Configuration(conf);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
  cluster.waitActive();
  final DistributedFileSystem fs =
      (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADEC;
  DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
  byte expected[] = DFSTestUtil.
      calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
  Assert.assertTrue(Arrays.equals(contents, expected));
  final ShortCircuitCache cache =
      fs.dfs.getClientContext().getShortCircuitCache();
  cache.close();
  Assert.assertTrue(cache.getDfsClientShmManager().
      getDomainSocketWatcher().isClosed());
  cluster.shutdown();
  sockDir.close();
}
项目:aliyun-oss-hadoop-fs    文件:BlockReaderTestUtil.java   
public static void enableBlockReaderFactoryTracing() {
  LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel(
      Level.TRACE);
  LogManager.getLogger(ShortCircuitCache.class.getName()).setLevel(
      Level.TRACE);
  LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel(
      Level.TRACE);
  LogManager.getLogger(BlockReaderLocal.class.getName()).setLevel(
      Level.TRACE);
}
项目:big-c    文件:TestEnhancedByteBufferAccess.java   
private void waitForReplicaAnchorStatus(final ShortCircuitCache cache,
    final ExtendedBlock block, final boolean expectedIsAnchorable,
      final boolean expectedIsAnchored, final int expectedOutstandingMmaps)
        throws Exception {
  GenericTestUtils.waitFor(new Supplier<Boolean>() {
    @Override
    public Boolean get() {
      final MutableBoolean result = new MutableBoolean(false);
      cache.accept(new CacheVisitor() {
        @Override
        public void visit(int numOutstandingMmaps,
            Map<ExtendedBlockId, ShortCircuitReplica> replicas,
            Map<ExtendedBlockId, InvalidToken> failedLoads,
            Map<Long, ShortCircuitReplica> evictable,
            Map<Long, ShortCircuitReplica> evictableMmapped) {
          Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps);
          ShortCircuitReplica replica =
              replicas.get(ExtendedBlockId.fromExtendedBlock(block));
          Assert.assertNotNull(replica);
          Slot slot = replica.getSlot();
          if ((expectedIsAnchorable != slot.isAnchorable()) ||
              (expectedIsAnchored != slot.isAnchored())) {
            LOG.info("replica " + replica + " has isAnchorable = " +
              slot.isAnchorable() + ", isAnchored = " + slot.isAnchored() + 
              ".  Waiting for isAnchorable = " + expectedIsAnchorable + 
              ", isAnchored = " + expectedIsAnchored);
            return;
          }
          result.setValue(true);
        }
      });
      return result.toBoolean();
    }
  }, 10, 60000);
}
项目:big-c    文件:TestBlockReaderFactory.java   
/**
 * Test that a client which does not support short-circuit reads using
 * shared memory can talk with a server which supports it.
 */
@Test
public void testShortCircuitReadFromClientWithoutShm() throws Exception {
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration clientConf = createShortCircuitConf(
      "testShortCircuitReadWithoutShm", sockDir);
  Configuration serverConf = new Configuration(clientConf);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
  cluster.waitActive();
  clientConf.setInt(
      DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
  clientConf.set(DFS_CLIENT_CONTEXT,
      "testShortCircuitReadFromClientWithoutShm_clientContext");
  final DistributedFileSystem fs =
      (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADEC;
  DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
  byte expected[] = DFSTestUtil.
      calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
  Assert.assertTrue(Arrays.equals(contents, expected));
  final ShortCircuitCache cache =
      fs.dfs.getClientContext().getShortCircuitCache();
  Assert.assertEquals(null, cache.getDfsClientShmManager());
  cluster.shutdown();
  sockDir.close();
}
项目:big-c    文件:TestBlockReaderFactory.java   
/**
 * Test shutting down the ShortCircuitCache while there are things in it.
 */
@Test
public void testShortCircuitCacheShutdown() throws Exception {
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration conf = createShortCircuitConf(
      "testShortCircuitCacheShutdown", sockDir);
  conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown");
  Configuration serverConf = new Configuration(conf);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
  cluster.waitActive();
  final DistributedFileSystem fs =
      (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADEC;
  DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
  byte expected[] = DFSTestUtil.
      calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
  Assert.assertTrue(Arrays.equals(contents, expected));
  final ShortCircuitCache cache =
      fs.dfs.getClientContext().getShortCircuitCache();
  cache.close();
  Assert.assertTrue(cache.getDfsClientShmManager().
      getDomainSocketWatcher().isClosed());
  cluster.shutdown();
  sockDir.close();
}
项目:big-c    文件:BlockReaderTestUtil.java   
public static void enableBlockReaderFactoryTracing() {
  LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel(
      Level.TRACE);
  LogManager.getLogger(ShortCircuitCache.class.getName()).setLevel(
      Level.TRACE);
  LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel(
      Level.TRACE);
  LogManager.getLogger(BlockReaderLocal.class.getName()).setLevel(
      Level.TRACE);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestEnhancedByteBufferAccess.java   
private void waitForReplicaAnchorStatus(final ShortCircuitCache cache,
    final ExtendedBlock block, final boolean expectedIsAnchorable,
      final boolean expectedIsAnchored, final int expectedOutstandingMmaps)
        throws Exception {
  GenericTestUtils.waitFor(new Supplier<Boolean>() {
    @Override
    public Boolean get() {
      final MutableBoolean result = new MutableBoolean(false);
      cache.accept(new CacheVisitor() {
        @Override
        public void visit(int numOutstandingMmaps,
            Map<ExtendedBlockId, ShortCircuitReplica> replicas,
            Map<ExtendedBlockId, InvalidToken> failedLoads,
            Map<Long, ShortCircuitReplica> evictable,
            Map<Long, ShortCircuitReplica> evictableMmapped) {
          Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps);
          ShortCircuitReplica replica =
              replicas.get(ExtendedBlockId.fromExtendedBlock(block));
          Assert.assertNotNull(replica);
          Slot slot = replica.getSlot();
          if ((expectedIsAnchorable != slot.isAnchorable()) ||
              (expectedIsAnchored != slot.isAnchored())) {
            LOG.info("replica " + replica + " has isAnchorable = " +
              slot.isAnchorable() + ", isAnchored = " + slot.isAnchored() + 
              ".  Waiting for isAnchorable = " + expectedIsAnchorable + 
              ", isAnchored = " + expectedIsAnchored);
            return;
          }
          result.setValue(true);
        }
      });
      return result.toBoolean();
    }
  }, 10, 60000);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestBlockReaderFactory.java   
/**
 * Test that a client which does not support short-circuit reads using
 * shared memory can talk with a server which supports it.
 */
@Test
public void testShortCircuitReadFromClientWithoutShm() throws Exception {
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration clientConf = createShortCircuitConf(
      "testShortCircuitReadWithoutShm", sockDir);
  Configuration serverConf = new Configuration(clientConf);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
  cluster.waitActive();
  clientConf.setInt(
      DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
  clientConf.set(DFS_CLIENT_CONTEXT,
      "testShortCircuitReadFromClientWithoutShm_clientContext");
  final DistributedFileSystem fs =
      (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADEC;
  DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
  byte expected[] = DFSTestUtil.
      calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
  Assert.assertTrue(Arrays.equals(contents, expected));
  final ShortCircuitCache cache =
      fs.dfs.getClientContext().getShortCircuitCache();
  Assert.assertEquals(null, cache.getDfsClientShmManager());
  cluster.shutdown();
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestBlockReaderFactory.java   
/**
 * Test shutting down the ShortCircuitCache while there are things in it.
 */
@Test
public void testShortCircuitCacheShutdown() throws Exception {
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration conf = createShortCircuitConf(
      "testShortCircuitCacheShutdown", sockDir);
  conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown");
  Configuration serverConf = new Configuration(conf);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
  cluster.waitActive();
  final DistributedFileSystem fs =
      (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADEC;
  DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
  byte expected[] = DFSTestUtil.
      calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
  Assert.assertTrue(Arrays.equals(contents, expected));
  final ShortCircuitCache cache =
      fs.dfs.getClientContext().getShortCircuitCache();
  cache.close();
  Assert.assertTrue(cache.getDfsClientShmManager().
      getDomainSocketWatcher().isClosed());
  cluster.shutdown();
}
项目:hadoop-2.6.0-cdh5.4.3    文件:BlockReaderTestUtil.java   
public static void enableBlockReaderFactoryTracing() {
  LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel(
      Level.TRACE);
  LogManager.getLogger(ShortCircuitCache.class.getName()).setLevel(
      Level.TRACE);
  LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel(
      Level.TRACE);
  LogManager.getLogger(BlockReaderLocal.class.getName()).setLevel(
      Level.TRACE);
}
项目:FlexMap    文件:TestEnhancedByteBufferAccess.java   
private void waitForReplicaAnchorStatus(final ShortCircuitCache cache,
    final ExtendedBlock block, final boolean expectedIsAnchorable,
      final boolean expectedIsAnchored, final int expectedOutstandingMmaps)
        throws Exception {
  GenericTestUtils.waitFor(new Supplier<Boolean>() {
    @Override
    public Boolean get() {
      final MutableBoolean result = new MutableBoolean(false);
      cache.accept(new CacheVisitor() {
        @Override
        public void visit(int numOutstandingMmaps,
            Map<ExtendedBlockId, ShortCircuitReplica> replicas,
            Map<ExtendedBlockId, InvalidToken> failedLoads,
            Map<Long, ShortCircuitReplica> evictable,
            Map<Long, ShortCircuitReplica> evictableMmapped) {
          Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps);
          ShortCircuitReplica replica =
              replicas.get(ExtendedBlockId.fromExtendedBlock(block));
          Assert.assertNotNull(replica);
          Slot slot = replica.getSlot();
          if ((expectedIsAnchorable != slot.isAnchorable()) ||
              (expectedIsAnchored != slot.isAnchored())) {
            LOG.info("replica " + replica + " has isAnchorable = " +
              slot.isAnchorable() + ", isAnchored = " + slot.isAnchored() + 
              ".  Waiting for isAnchorable = " + expectedIsAnchorable + 
              ", isAnchored = " + expectedIsAnchored);
            return;
          }
          result.setValue(true);
        }
      });
      return result.toBoolean();
    }
  }, 10, 60000);
}
项目:FlexMap    文件:TestBlockReaderFactory.java   
/**
 * Test that a client which does not support short-circuit reads using
 * shared memory can talk with a server which supports it.
 */
@Test
public void testShortCircuitReadFromClientWithoutShm() throws Exception {
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration clientConf = createShortCircuitConf(
      "testShortCircuitReadWithoutShm", sockDir);
  Configuration serverConf = new Configuration(clientConf);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
  cluster.waitActive();
  clientConf.setInt(
      DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
  clientConf.set(DFS_CLIENT_CONTEXT,
      "testShortCircuitReadFromClientWithoutShm_clientContext");
  final DistributedFileSystem fs =
      (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADEC;
  DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
  byte expected[] = DFSTestUtil.
      calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
  Assert.assertTrue(Arrays.equals(contents, expected));
  final ShortCircuitCache cache =
      fs.dfs.getClientContext().getShortCircuitCache();
  Assert.assertEquals(null, cache.getDfsClientShmManager());
  cluster.shutdown();
}
项目:FlexMap    文件:TestBlockReaderFactory.java   
/**
 * Test shutting down the ShortCircuitCache while there are things in it.
 */
@Test
public void testShortCircuitCacheShutdown() throws Exception {
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration conf = createShortCircuitConf(
      "testShortCircuitCacheShutdown", sockDir);
  conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown");
  Configuration serverConf = new Configuration(conf);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
  cluster.waitActive();
  final DistributedFileSystem fs =
      (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADEC;
  DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
  byte expected[] = DFSTestUtil.
      calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
  Assert.assertTrue(Arrays.equals(contents, expected));
  final ShortCircuitCache cache =
      fs.dfs.getClientContext().getShortCircuitCache();
  cache.close();
  Assert.assertTrue(cache.getDfsClientShmManager().
      getDomainSocketWatcher().isClosed());
  cluster.shutdown();
}
项目:FlexMap    文件:BlockReaderTestUtil.java   
public static void enableBlockReaderFactoryTracing() {
  LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel(
      Level.TRACE);
  LogManager.getLogger(ShortCircuitCache.class.getName()).setLevel(
      Level.TRACE);
  LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel(
      Level.TRACE);
  LogManager.getLogger(BlockReaderLocal.class.getName()).setLevel(
      Level.TRACE);
}
项目:hadoop    文件:BlockReaderFactory.java   
private BlockReader getBlockReaderLocal() throws InvalidToken {
  if (LOG.isTraceEnabled()) {
    LOG.trace(this + ": trying to construct a BlockReaderLocal " +
        "for short-circuit reads.");
  }
  if (pathInfo == null) {
    pathInfo = clientContext.getDomainSocketFactory().
                    getPathInfo(inetSocketAddress, conf);
  }
  if (!pathInfo.getPathState().getUsableForShortCircuit()) {
    PerformanceAdvisory.LOG.debug(this + ": " + pathInfo + " is not " +
        "usable for short circuit; giving up on BlockReaderLocal.");
    return null;
  }
  ShortCircuitCache cache = clientContext.getShortCircuitCache();
  ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
  ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
  InvalidToken exc = info.getInvalidTokenException();
  if (exc != null) {
    if (LOG.isTraceEnabled()) {
      LOG.trace(this + ": got InvalidToken exception while trying to " +
          "construct BlockReaderLocal via " + pathInfo.getPath());
    }
    throw exc;
  }
  if (info.getReplica() == null) {
    if (LOG.isTraceEnabled()) {
      PerformanceAdvisory.LOG.debug(this + ": failed to get " +
          "ShortCircuitReplica. Cannot construct " +
          "BlockReaderLocal via " + pathInfo.getPath());
    }
    return null;
  }
  return new BlockReaderLocal.Builder(conf).
      setFilename(fileName).
      setBlock(block).
      setStartOffset(startOffset).
      setShortCircuitReplica(info.getReplica()).
      setVerifyChecksum(verifyChecksum).
      setCachingStrategy(cachingStrategy).
      setStorageType(storageType).
      build();
}
项目:hadoop    文件:ClientContext.java   
public ShortCircuitCache getShortCircuitCache() {
  return shortCircuitCache;
}
项目:hadoop    文件:TestBlockReaderFactory.java   
/**
 * Test the case where we have multiple threads waiting on the
 * ShortCircuitCache delivering a certain ShortCircuitReplica.
 *
 * In this case, there should only be one call to
 * createShortCircuitReplicaInfo.  This one replica should be shared
 * by all threads.
 */
@Test(timeout=60000)
public void testMultipleWaitersOnShortCircuitCache()
    throws Exception {
  final CountDownLatch latch = new CountDownLatch(1);
  final AtomicBoolean creationIsBlocked = new AtomicBoolean(true);
  final AtomicBoolean testFailed = new AtomicBoolean(false);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  BlockReaderFactory.createShortCircuitReplicaInfoCallback =
    new ShortCircuitCache.ShortCircuitReplicaCreator() {
      @Override
      public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
        Uninterruptibles.awaitUninterruptibly(latch);
        if (!creationIsBlocked.compareAndSet(true, false)) {
          Assert.fail("there were multiple calls to "
              + "createShortCircuitReplicaInfo.  Only one was expected.");
        }
        return null;
      }
    };
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration conf = createShortCircuitConf(
      "testMultipleWaitersOnShortCircuitCache", sockDir);
  MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  cluster.waitActive();
  final DistributedFileSystem dfs = cluster.getFileSystem();
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADED;
  final int NUM_THREADS = 10;
  DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  Runnable readerRunnable = new Runnable() {
    @Override
    public void run() {
      try {
        byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE));
        Assert.assertFalse(creationIsBlocked.get());
        byte expected[] = DFSTestUtil.
            calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
        Assert.assertTrue(Arrays.equals(contents, expected));
      } catch (Throwable e) {
        LOG.error("readerRunnable error", e);
        testFailed.set(true);
      }
    }
  };
  Thread threads[] = new Thread[NUM_THREADS];
  for (int i = 0; i < NUM_THREADS; i++) {
    threads[i] = new Thread(readerRunnable);
    threads[i].start();
  }
  Thread.sleep(500);
  latch.countDown();
  for (int i = 0; i < NUM_THREADS; i++) {
    Uninterruptibles.joinUninterruptibly(threads[i]);
  }
  cluster.shutdown();
  sockDir.close();
  Assert.assertFalse(testFailed.get());
}
项目:hadoop    文件:TestBlockReaderFactory.java   
/**
 * Test that a client which supports short-circuit reads using
 * shared memory can fall back to not using shared memory when
 * the server doesn't support it.
 */
@Test
public void testShortCircuitReadFromServerWithoutShm() throws Exception {
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration clientConf = createShortCircuitConf(
      "testShortCircuitReadFromServerWithoutShm", sockDir);
  Configuration serverConf = new Configuration(clientConf);
  serverConf.setInt(
      DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
  cluster.waitActive();
  clientConf.set(DFS_CLIENT_CONTEXT,
      "testShortCircuitReadFromServerWithoutShm_clientContext");
  final DistributedFileSystem fs =
      (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADEC;
  DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
  byte expected[] = DFSTestUtil.
      calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
  Assert.assertTrue(Arrays.equals(contents, expected));
  final ShortCircuitCache cache =
      fs.dfs.getClientContext().getShortCircuitCache();
  final DatanodeInfo datanode =
      new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
  cache.getDfsClientShmManager().visit(new Visitor() {
    @Override
    public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
        throws IOException {
      Assert.assertEquals(1,  info.size());
      PerDatanodeVisitorInfo vinfo = info.get(datanode);
      Assert.assertTrue(vinfo.disabled);
      Assert.assertEquals(0, vinfo.full.size());
      Assert.assertEquals(0, vinfo.notFull.size());
    }
  });
  cluster.shutdown();
  sockDir.close();
}
项目:aliyun-oss-hadoop-fs    文件:ClientContext.java   
public ShortCircuitCache getShortCircuitCache() {
  return shortCircuitCache;
}
项目:aliyun-oss-hadoop-fs    文件:TestBlockReaderFactory.java   
/**
 * Test the case where we have multiple threads waiting on the
 * ShortCircuitCache delivering a certain ShortCircuitReplica.
 *
 * In this case, there should only be one call to
 * createShortCircuitReplicaInfo.  This one replica should be shared
 * by all threads.
 */
@Test(timeout=60000)
public void testMultipleWaitersOnShortCircuitCache()
    throws Exception {
  final CountDownLatch latch = new CountDownLatch(1);
  final AtomicBoolean creationIsBlocked = new AtomicBoolean(true);
  final AtomicBoolean testFailed = new AtomicBoolean(false);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  BlockReaderFactory.createShortCircuitReplicaInfoCallback =
    new ShortCircuitCache.ShortCircuitReplicaCreator() {
      @Override
      public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
        Uninterruptibles.awaitUninterruptibly(latch);
        if (!creationIsBlocked.compareAndSet(true, false)) {
          Assert.fail("there were multiple calls to "
              + "createShortCircuitReplicaInfo.  Only one was expected.");
        }
        return null;
      }
    };
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration conf = createShortCircuitConf(
      "testMultipleWaitersOnShortCircuitCache", sockDir);
  MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  cluster.waitActive();
  final DistributedFileSystem dfs = cluster.getFileSystem();
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADED;
  final int NUM_THREADS = 10;
  DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  Runnable readerRunnable = new Runnable() {
    @Override
    public void run() {
      try {
        byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE));
        Assert.assertFalse(creationIsBlocked.get());
        byte expected[] = DFSTestUtil.
            calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
        Assert.assertTrue(Arrays.equals(contents, expected));
      } catch (Throwable e) {
        LOG.error("readerRunnable error", e);
        testFailed.set(true);
      }
    }
  };
  Thread threads[] = new Thread[NUM_THREADS];
  for (int i = 0; i < NUM_THREADS; i++) {
    threads[i] = new Thread(readerRunnable);
    threads[i].start();
  }
  Thread.sleep(500);
  latch.countDown();
  for (int i = 0; i < NUM_THREADS; i++) {
    Uninterruptibles.joinUninterruptibly(threads[i]);
  }
  cluster.shutdown();
  sockDir.close();
  Assert.assertFalse(testFailed.get());
}
项目:aliyun-oss-hadoop-fs    文件:TestBlockReaderFactory.java   
/**
 * Test that a client which supports short-circuit reads using
 * shared memory can fall back to not using shared memory when
 * the server doesn't support it.
 */
@Test
public void testShortCircuitReadFromServerWithoutShm() throws Exception {
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration clientConf = createShortCircuitConf(
      "testShortCircuitReadFromServerWithoutShm", sockDir);
  Configuration serverConf = new Configuration(clientConf);
  serverConf.setInt(
      DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
  cluster.waitActive();
  clientConf.set(DFS_CLIENT_CONTEXT,
      "testShortCircuitReadFromServerWithoutShm_clientContext");
  final DistributedFileSystem fs =
      (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADEC;
  DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
  byte expected[] = DFSTestUtil.
      calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
  Assert.assertTrue(Arrays.equals(contents, expected));
  final ShortCircuitCache cache =
      fs.dfs.getClientContext().getShortCircuitCache();
  final DatanodeInfo datanode =
      new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
  cache.getDfsClientShmManager().visit(new Visitor() {
    @Override
    public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
        throws IOException {
      Assert.assertEquals(1,  info.size());
      PerDatanodeVisitorInfo vinfo = info.get(datanode);
      Assert.assertTrue(vinfo.disabled);
      Assert.assertEquals(0, vinfo.full.size());
      Assert.assertEquals(0, vinfo.notFull.size());
    }
  });
  cluster.shutdown();
  sockDir.close();
}
项目:big-c    文件:BlockReaderFactory.java   
private BlockReader getBlockReaderLocal() throws InvalidToken {
  if (LOG.isTraceEnabled()) {
    LOG.trace(this + ": trying to construct a BlockReaderLocal " +
        "for short-circuit reads.");
  }
  if (pathInfo == null) {
    pathInfo = clientContext.getDomainSocketFactory().
                    getPathInfo(inetSocketAddress, conf);
  }
  if (!pathInfo.getPathState().getUsableForShortCircuit()) {
    PerformanceAdvisory.LOG.debug(this + ": " + pathInfo + " is not " +
        "usable for short circuit; giving up on BlockReaderLocal.");
    return null;
  }
  ShortCircuitCache cache = clientContext.getShortCircuitCache();
  ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
  ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
  InvalidToken exc = info.getInvalidTokenException();
  if (exc != null) {
    if (LOG.isTraceEnabled()) {
      LOG.trace(this + ": got InvalidToken exception while trying to " +
          "construct BlockReaderLocal via " + pathInfo.getPath());
    }
    throw exc;
  }
  if (info.getReplica() == null) {
    if (LOG.isTraceEnabled()) {
      PerformanceAdvisory.LOG.debug(this + ": failed to get " +
          "ShortCircuitReplica. Cannot construct " +
          "BlockReaderLocal via " + pathInfo.getPath());
    }
    return null;
  }
  return new BlockReaderLocal.Builder(conf).
      setFilename(fileName).
      setBlock(block).
      setStartOffset(startOffset).
      setShortCircuitReplica(info.getReplica()).
      setVerifyChecksum(verifyChecksum).
      setCachingStrategy(cachingStrategy).
      setStorageType(storageType).
      build();
}
项目:big-c    文件:ClientContext.java   
public ShortCircuitCache getShortCircuitCache() {
  return shortCircuitCache;
}
项目:big-c    文件:TestBlockReaderFactory.java   
/**
 * Test the case where we have multiple threads waiting on the
 * ShortCircuitCache delivering a certain ShortCircuitReplica.
 *
 * In this case, there should only be one call to
 * createShortCircuitReplicaInfo.  This one replica should be shared
 * by all threads.
 */
@Test(timeout=60000)
public void testMultipleWaitersOnShortCircuitCache()
    throws Exception {
  final CountDownLatch latch = new CountDownLatch(1);
  final AtomicBoolean creationIsBlocked = new AtomicBoolean(true);
  final AtomicBoolean testFailed = new AtomicBoolean(false);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  BlockReaderFactory.createShortCircuitReplicaInfoCallback =
    new ShortCircuitCache.ShortCircuitReplicaCreator() {
      @Override
      public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
        Uninterruptibles.awaitUninterruptibly(latch);
        if (!creationIsBlocked.compareAndSet(true, false)) {
          Assert.fail("there were multiple calls to "
              + "createShortCircuitReplicaInfo.  Only one was expected.");
        }
        return null;
      }
    };
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration conf = createShortCircuitConf(
      "testMultipleWaitersOnShortCircuitCache", sockDir);
  MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  cluster.waitActive();
  final DistributedFileSystem dfs = cluster.getFileSystem();
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADED;
  final int NUM_THREADS = 10;
  DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  Runnable readerRunnable = new Runnable() {
    @Override
    public void run() {
      try {
        byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE));
        Assert.assertFalse(creationIsBlocked.get());
        byte expected[] = DFSTestUtil.
            calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
        Assert.assertTrue(Arrays.equals(contents, expected));
      } catch (Throwable e) {
        LOG.error("readerRunnable error", e);
        testFailed.set(true);
      }
    }
  };
  Thread threads[] = new Thread[NUM_THREADS];
  for (int i = 0; i < NUM_THREADS; i++) {
    threads[i] = new Thread(readerRunnable);
    threads[i].start();
  }
  Thread.sleep(500);
  latch.countDown();
  for (int i = 0; i < NUM_THREADS; i++) {
    Uninterruptibles.joinUninterruptibly(threads[i]);
  }
  cluster.shutdown();
  sockDir.close();
  Assert.assertFalse(testFailed.get());
}
项目:big-c    文件:TestBlockReaderFactory.java   
/**
 * Test that a client which supports short-circuit reads using
 * shared memory can fall back to not using shared memory when
 * the server doesn't support it.
 */
@Test
public void testShortCircuitReadFromServerWithoutShm() throws Exception {
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration clientConf = createShortCircuitConf(
      "testShortCircuitReadFromServerWithoutShm", sockDir);
  Configuration serverConf = new Configuration(clientConf);
  serverConf.setInt(
      DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  final MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
  cluster.waitActive();
  clientConf.set(DFS_CLIENT_CONTEXT,
      "testShortCircuitReadFromServerWithoutShm_clientContext");
  final DistributedFileSystem fs =
      (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADEC;
  DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
  byte expected[] = DFSTestUtil.
      calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
  Assert.assertTrue(Arrays.equals(contents, expected));
  final ShortCircuitCache cache =
      fs.dfs.getClientContext().getShortCircuitCache();
  final DatanodeInfo datanode =
      new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
  cache.getDfsClientShmManager().visit(new Visitor() {
    @Override
    public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
        throws IOException {
      Assert.assertEquals(1,  info.size());
      PerDatanodeVisitorInfo vinfo = info.get(datanode);
      Assert.assertTrue(vinfo.disabled);
      Assert.assertEquals(0, vinfo.full.size());
      Assert.assertEquals(0, vinfo.notFull.size());
    }
  });
  cluster.shutdown();
  sockDir.close();
}
项目:hadoop-2.6.0-cdh5.4.3    文件:BlockReaderFactory.java   
private BlockReader getBlockReaderLocal() throws InvalidToken {
  if (LOG.isTraceEnabled()) {
    LOG.trace(this + ": trying to construct a BlockReaderLocal " +
        "for short-circuit reads.");
  }
  if (pathInfo == null) {
    pathInfo = clientContext.getDomainSocketFactory().
                    getPathInfo(inetSocketAddress, conf);
  }
  if (!pathInfo.getPathState().getUsableForShortCircuit()) {
    PerformanceAdvisory.LOG.debug(this + ": " + pathInfo + " is not " +
        "usable for short circuit; giving up on BlockReaderLocal.");
    return null;
  }
  ShortCircuitCache cache = clientContext.getShortCircuitCache();
  ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
  ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
  InvalidToken exc = info.getInvalidTokenException();
  if (exc != null) {
    if (LOG.isTraceEnabled()) {
      LOG.trace(this + ": got InvalidToken exception while trying to " +
          "construct BlockReaderLocal via " + pathInfo.getPath());
    }
    throw exc;
  }
  if (info.getReplica() == null) {
    if (LOG.isTraceEnabled()) {
      PerformanceAdvisory.LOG.debug(this + ": failed to get " +
          "ShortCircuitReplica. Cannot construct " +
          "BlockReaderLocal via " + pathInfo.getPath());
    }
    return null;
  }
  return new BlockReaderLocal.Builder(conf).
      setFilename(fileName).
      setBlock(block).
      setStartOffset(startOffset).
      setShortCircuitReplica(info.getReplica()).
      setVerifyChecksum(verifyChecksum).
      setCachingStrategy(cachingStrategy).
      setStorageType(storageType).
      build();
}
项目:hadoop-2.6.0-cdh5.4.3    文件:ClientContext.java   
public ShortCircuitCache getShortCircuitCache() {
  return shortCircuitCache;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestBlockReaderFactory.java   
/**
 * Test the case where we have multiple threads waiting on the
 * ShortCircuitCache delivering a certain ShortCircuitReplica.
 *
 * In this case, there should only be one call to
 * createShortCircuitReplicaInfo.  This one replica should be shared
 * by all threads.
 */
@Test(timeout=60000)
public void testMultipleWaitersOnShortCircuitCache()
    throws Exception {
  final CountDownLatch latch = new CountDownLatch(1);
  final AtomicBoolean creationIsBlocked = new AtomicBoolean(true);
  final AtomicBoolean testFailed = new AtomicBoolean(false);
  DFSInputStream.tcpReadsDisabledForTesting = true;
  BlockReaderFactory.createShortCircuitReplicaInfoCallback =
    new ShortCircuitCache.ShortCircuitReplicaCreator() {
      @Override
      public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
        Uninterruptibles.awaitUninterruptibly(latch);
        if (!creationIsBlocked.compareAndSet(true, false)) {
          Assert.fail("there were multiple calls to "
              + "createShortCircuitReplicaInfo.  Only one was expected.");
        }
        return null;
      }
    };
  TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
  Configuration conf = createShortCircuitConf(
      "testMultipleWaitersOnShortCircuitCache", sockDir);
  MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  cluster.waitActive();
  final DistributedFileSystem dfs = cluster.getFileSystem();
  final String TEST_FILE = "/test_file";
  final int TEST_FILE_LEN = 4000;
  final int SEED = 0xFADED;
  final int NUM_THREADS = 10;
  DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
      (short)1, SEED);
  Runnable readerRunnable = new Runnable() {
    @Override
    public void run() {
      try {
        byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE));
        Assert.assertFalse(creationIsBlocked.get());
        byte expected[] = DFSTestUtil.
            calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
        Assert.assertTrue(Arrays.equals(contents, expected));
      } catch (Throwable e) {
        LOG.error("readerRunnable error", e);
        testFailed.set(true);
      }
    }
  };
  Thread threads[] = new Thread[NUM_THREADS];
  for (int i = 0; i < NUM_THREADS; i++) {
    threads[i] = new Thread(readerRunnable);
    threads[i].start();
  }
  Thread.sleep(500);
  latch.countDown();
  for (int i = 0; i < NUM_THREADS; i++) {
    Uninterruptibles.joinUninterruptibly(threads[i]);
  }
  cluster.shutdown();
  sockDir.close();
  Assert.assertFalse(testFailed.get());
}