Java 类org.apache.hadoop.hdfs.ExtendedBlockId 实例源码

项目:hadoop    文件:DfsClientShmManager.java   
/**
 * Pull a slot out of a preexisting shared memory segment.
 *
 * Must be called with the manager lock held.
 *
 * @param blockId     The blockId to put inside the Slot object.
 *
 * @return            null if none of our shared memory segments contain a
 *                      free slot; the slot object otherwise.
 */
private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) {
  if (notFull.isEmpty()) {
    return null;
  }
  Entry<ShmId, DfsClientShm> entry = notFull.firstEntry();
  DfsClientShm shm = entry.getValue();
  ShmId shmId = shm.getShmId();
  Slot slot = shm.allocAndRegisterSlot(blockId);
  if (shm.isFull()) {
    if (LOG.isTraceEnabled()) {
      LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() +
          " out of " + shm);
    }
    DfsClientShm removedShm = notFull.remove(shmId);
    Preconditions.checkState(removedShm == shm);
    full.put(shmId, shm);
  } else {
    if (LOG.isTraceEnabled()) {
      LOG.trace(this + ": pulled slot " + slot.getSlotIdx() +
          " out of " + shm);
    }
  }
  return slot;
}
项目:hadoop    文件:DfsClientShmManager.java   
public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
    MutableBoolean usedPeer, ExtendedBlockId blockId,
    String clientName) throws IOException {
  lock.lock();
  try {
    if (closed) {
      LOG.trace(this + ": the DfsClientShmManager isclosed.");
      return null;
    }
    EndpointShmManager shmManager = datanodes.get(datanode);
    if (shmManager == null) {
      shmManager = new EndpointShmManager(datanode);
      datanodes.put(datanode, shmManager);
    }
    return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
  } finally {
    lock.unlock();
  }
}
项目:hadoop    文件:ShortCircuitShm.java   
/**
 * Allocate a new slot and register it.
 *
 * This function chooses an empty slot, initializes it, and then returns
 * the relevant Slot object.
 *
 * @return    The new slot.
 */
synchronized public final Slot allocAndRegisterSlot(
    ExtendedBlockId blockId) {
  int idx = allocatedSlots.nextClearBit(0);
  if (idx >= slots.length) {
    throw new RuntimeException(this + ": no more slots are available.");
  }
  allocatedSlots.set(idx, true);
  Slot slot = new Slot(calculateSlotAddress(idx), blockId);
  slot.clear();
  slot.makeValid();
  slots[idx] = slot;
  if (LOG.isTraceEnabled()) {
    LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots +
                StringUtils.getStackTrace(Thread.currentThread()));
  }
  return slot;
}
项目:hadoop    文件:ShortCircuitReplica.java   
public ShortCircuitReplica(ExtendedBlockId key,
    FileInputStream dataStream, FileInputStream metaStream,
    ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException {
  this.key = key;
  this.dataStream = dataStream;
  this.metaStream = metaStream;
  this.metaHeader =
        BlockMetadataHeader.preadHeader(metaStream.getChannel());
  if (metaHeader.getVersion() != 1) {
    throw new IOException("invalid metadata header version " +
        metaHeader.getVersion() + ".  Can only handle version 1.");
  }
  this.cache = cache;
  this.creationTimeMs = creationTimeMs;
  this.slot = slot;
}
项目:hadoop    文件:FsDatasetCache.java   
/**
 * Attempt to begin caching a block.
 */
synchronized void cacheBlock(long blockId, String bpid,
    String blockFileName, long length, long genstamp,
    Executor volumeExecutor) {
  ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
  Value prevValue = mappableBlockMap.get(key);
  if (prevValue != null) {
    LOG.debug("Block with id {}, pool {} already exists in the "
            + "FsDatasetCache with state {}", blockId, bpid, prevValue.state
    );
    numBlocksFailedToCache.incrementAndGet();
    return;
  }
  mappableBlockMap.put(key, new Value(null, State.CACHING));
  volumeExecutor.execute(
      new CachingTask(key, blockFileName, length, genstamp));
  LOG.debug("Initiating caching for Block with id {}, pool {}", blockId,
      bpid);
}
项目:hadoop    文件:FsDatasetImpl.java   
/**
 * Invalidate a block but does not delete the actual on-disk block file.
 *
 * It should only be used when deactivating disks.
 *
 * @param bpid the block pool ID.
 * @param block The block to be invalidated.
 */
public void invalidate(String bpid, ReplicaInfo block) {
  // If a DFSClient has the replica in its cache of short-circuit file
  // descriptors (and the client is using ShortCircuitShm), invalidate it.
  // The short-circuit registry is null in the unit tests, because the
  // datanode is mock object.
  if (datanode.getShortCircuitRegistry() != null) {
    datanode.getShortCircuitRegistry().processBlockInvalidation(
        new ExtendedBlockId(block.getBlockId(), bpid));

    // If the block is cached, start uncaching it.
    cacheManager.uncacheBlock(bpid, block.getBlockId());
  }

  datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
      block.getStorageUuid());
}
项目:hadoop    文件:ShortCircuitRegistry.java   
/**
 * Invalidate any slot associated with a blockId that we are invalidating
 * (deleting) from this DataNode.  When a slot is invalid, the DFSClient will
 * not use the corresponding replica for new read or mmap operations (although
 * existing, ongoing read or mmap operations will complete.)
 *
 * @param blockId        The block ID.
 */
public synchronized void processBlockInvalidation(ExtendedBlockId blockId) {
  if (!enabled) return;
  final Set<Slot> affectedSlots = slots.get(blockId);
  if (!affectedSlots.isEmpty()) {
    final StringBuilder bld = new StringBuilder();
    String prefix = "";
    bld.append("Block ").append(blockId).append(" has been invalidated.  ").
        append("Marking short-circuit slots as invalid: ");
    for (Slot slot : affectedSlots) {
      slot.makeInvalid();
      bld.append(prefix).append(slot.toString());
      prefix = ", ";
    }
    LOG.info(bld.toString());
  }
}
项目:hadoop    文件:TestEnhancedByteBufferAccess.java   
@Override
public void visit(int numOutstandingMmaps,
    Map<ExtendedBlockId, ShortCircuitReplica> replicas,
    Map<ExtendedBlockId, InvalidToken> failedLoads,
    Map<Long, ShortCircuitReplica> evictable,
    Map<Long, ShortCircuitReplica> evictableMmapped) {
  if (expectedNumOutstandingMmaps >= 0) {
    Assert.assertEquals(expectedNumOutstandingMmaps, numOutstandingMmaps);
  }
  if (expectedNumReplicas >= 0) {
    Assert.assertEquals(expectedNumReplicas, replicas.size());
  }
  if (expectedNumEvictable >= 0) {
    Assert.assertEquals(expectedNumEvictable, evictable.size());
  }
  if (expectedNumMmapedEvictable >= 0) {
    Assert.assertEquals(expectedNumMmapedEvictable, evictableMmapped.size());
  }
}
项目:aliyun-oss-hadoop-fs    文件:DfsClientShmManager.java   
/**
 * Pull a slot out of a preexisting shared memory segment.
 *
 * Must be called with the manager lock held.
 *
 * @param blockId     The blockId to put inside the Slot object.
 *
 * @return            null if none of our shared memory segments contain a
 *                      free slot; the slot object otherwise.
 */
private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) {
  if (notFull.isEmpty()) {
    return null;
  }
  Entry<ShmId, DfsClientShm> entry = notFull.firstEntry();
  DfsClientShm shm = entry.getValue();
  ShmId shmId = shm.getShmId();
  Slot slot = shm.allocAndRegisterSlot(blockId);
  if (shm.isFull()) {
    LOG.trace("{}: pulled the last slot {} out of {}",
        this, slot.getSlotIdx(), shm);
    DfsClientShm removedShm = notFull.remove(shmId);
    Preconditions.checkState(removedShm == shm);
    full.put(shmId, shm);
  } else {
    LOG.trace("{}: pulled slot {} out of {}", this, slot.getSlotIdx(), shm);
  }
  return slot;
}
项目:aliyun-oss-hadoop-fs    文件:DfsClientShmManager.java   
public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
    MutableBoolean usedPeer, ExtendedBlockId blockId,
    String clientName) throws IOException {
  lock.lock();
  try {
    if (closed) {
      LOG.trace(this + ": the DfsClientShmManager isclosed.");
      return null;
    }
    EndpointShmManager shmManager = datanodes.get(datanode);
    if (shmManager == null) {
      shmManager = new EndpointShmManager(datanode);
      datanodes.put(datanode, shmManager);
    }
    return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
  } finally {
    lock.unlock();
  }
}
项目:aliyun-oss-hadoop-fs    文件:ShortCircuitShm.java   
/**
 * Allocate a new slot and register it.
 *
 * This function chooses an empty slot, initializes it, and then returns
 * the relevant Slot object.
 *
 * @return    The new slot.
 */
synchronized public final Slot allocAndRegisterSlot(
    ExtendedBlockId blockId) {
  int idx = allocatedSlots.nextClearBit(0);
  if (idx >= slots.length) {
    throw new RuntimeException(this + ": no more slots are available.");
  }
  allocatedSlots.set(idx, true);
  Slot slot = new Slot(calculateSlotAddress(idx), blockId);
  slot.clear();
  slot.makeValid();
  slots[idx] = slot;
  if (LOG.isTraceEnabled()) {
    LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots +
                StringUtils.getStackTrace(Thread.currentThread()));
  }
  return slot;
}
项目:aliyun-oss-hadoop-fs    文件:ShortCircuitReplica.java   
public ShortCircuitReplica(ExtendedBlockId key,
    FileInputStream dataStream, FileInputStream metaStream,
    ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException {
  this.key = key;
  this.dataStream = dataStream;
  this.metaStream = metaStream;
  this.metaHeader =
        BlockMetadataHeader.preadHeader(metaStream.getChannel());
  if (metaHeader.getVersion() != 1) {
    throw new IOException("invalid metadata header version " +
        metaHeader.getVersion() + ".  Can only handle version 1.");
  }
  this.cache = cache;
  this.creationTimeMs = creationTimeMs;
  this.slot = slot;
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetCache.java   
/**
 * Attempt to begin caching a block.
 */
synchronized void cacheBlock(long blockId, String bpid,
    String blockFileName, long length, long genstamp,
    Executor volumeExecutor) {
  ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
  Value prevValue = mappableBlockMap.get(key);
  if (prevValue != null) {
    LOG.debug("Block with id {}, pool {} already exists in the "
            + "FsDatasetCache with state {}", blockId, bpid, prevValue.state
    );
    numBlocksFailedToCache.incrementAndGet();
    return;
  }
  mappableBlockMap.put(key, new Value(null, State.CACHING));
  volumeExecutor.execute(
      new CachingTask(key, blockFileName, length, genstamp));
  LOG.debug("Initiating caching for Block with id {}, pool {}", blockId,
      bpid);
}
项目:aliyun-oss-hadoop-fs    文件:ShortCircuitRegistry.java   
/**
 * Invalidate any slot associated with a blockId that we are invalidating
 * (deleting) from this DataNode.  When a slot is invalid, the DFSClient will
 * not use the corresponding replica for new read or mmap operations (although
 * existing, ongoing read or mmap operations will complete.)
 *
 * @param blockId        The block ID.
 */
public synchronized void processBlockInvalidation(ExtendedBlockId blockId) {
  if (!enabled) return;
  final Set<Slot> affectedSlots = slots.get(blockId);
  if (!affectedSlots.isEmpty()) {
    final StringBuilder bld = new StringBuilder();
    String prefix = "";
    bld.append("Block ").append(blockId).append(" has been invalidated.  ").
        append("Marking short-circuit slots as invalid: ");
    for (Slot slot : affectedSlots) {
      slot.makeInvalid();
      bld.append(prefix).append(slot.toString());
      prefix = ", ";
    }
    LOG.info(bld.toString());
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestEnhancedByteBufferAccess.java   
@Override
public void visit(int numOutstandingMmaps,
    Map<ExtendedBlockId, ShortCircuitReplica> replicas,
    Map<ExtendedBlockId, InvalidToken> failedLoads,
    Map<Long, ShortCircuitReplica> evictable,
    Map<Long, ShortCircuitReplica> evictableMmapped) {
  if (expectedNumOutstandingMmaps >= 0) {
    Assert.assertEquals(expectedNumOutstandingMmaps, numOutstandingMmaps);
  }
  if (expectedNumReplicas >= 0) {
    Assert.assertEquals(expectedNumReplicas, replicas.size());
  }
  if (expectedNumEvictable >= 0) {
    Assert.assertEquals(expectedNumEvictable, evictable.size());
  }
  if (expectedNumMmapedEvictable >= 0) {
    Assert.assertEquals(expectedNumMmapedEvictable, evictableMmapped.size());
  }
}
项目:big-c    文件:DfsClientShmManager.java   
/**
 * Pull a slot out of a preexisting shared memory segment.
 *
 * Must be called with the manager lock held.
 *
 * @param blockId     The blockId to put inside the Slot object.
 *
 * @return            null if none of our shared memory segments contain a
 *                      free slot; the slot object otherwise.
 */
private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) {
  if (notFull.isEmpty()) {
    return null;
  }
  Entry<ShmId, DfsClientShm> entry = notFull.firstEntry();
  DfsClientShm shm = entry.getValue();
  ShmId shmId = shm.getShmId();
  Slot slot = shm.allocAndRegisterSlot(blockId);
  if (shm.isFull()) {
    if (LOG.isTraceEnabled()) {
      LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() +
          " out of " + shm);
    }
    DfsClientShm removedShm = notFull.remove(shmId);
    Preconditions.checkState(removedShm == shm);
    full.put(shmId, shm);
  } else {
    if (LOG.isTraceEnabled()) {
      LOG.trace(this + ": pulled slot " + slot.getSlotIdx() +
          " out of " + shm);
    }
  }
  return slot;
}
项目:big-c    文件:DfsClientShmManager.java   
public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
    MutableBoolean usedPeer, ExtendedBlockId blockId,
    String clientName) throws IOException {
  lock.lock();
  try {
    if (closed) {
      LOG.trace(this + ": the DfsClientShmManager isclosed.");
      return null;
    }
    EndpointShmManager shmManager = datanodes.get(datanode);
    if (shmManager == null) {
      shmManager = new EndpointShmManager(datanode);
      datanodes.put(datanode, shmManager);
    }
    return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
  } finally {
    lock.unlock();
  }
}
项目:big-c    文件:ShortCircuitShm.java   
/**
 * Allocate a new slot and register it.
 *
 * This function chooses an empty slot, initializes it, and then returns
 * the relevant Slot object.
 *
 * @return    The new slot.
 */
synchronized public final Slot allocAndRegisterSlot(
    ExtendedBlockId blockId) {
  int idx = allocatedSlots.nextClearBit(0);
  if (idx >= slots.length) {
    throw new RuntimeException(this + ": no more slots are available.");
  }
  allocatedSlots.set(idx, true);
  Slot slot = new Slot(calculateSlotAddress(idx), blockId);
  slot.clear();
  slot.makeValid();
  slots[idx] = slot;
  if (LOG.isTraceEnabled()) {
    LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots +
                StringUtils.getStackTrace(Thread.currentThread()));
  }
  return slot;
}
项目:big-c    文件:ShortCircuitReplica.java   
public ShortCircuitReplica(ExtendedBlockId key,
    FileInputStream dataStream, FileInputStream metaStream,
    ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException {
  this.key = key;
  this.dataStream = dataStream;
  this.metaStream = metaStream;
  this.metaHeader =
        BlockMetadataHeader.preadHeader(metaStream.getChannel());
  if (metaHeader.getVersion() != 1) {
    throw new IOException("invalid metadata header version " +
        metaHeader.getVersion() + ".  Can only handle version 1.");
  }
  this.cache = cache;
  this.creationTimeMs = creationTimeMs;
  this.slot = slot;
}
项目:big-c    文件:FsDatasetCache.java   
/**
 * Attempt to begin caching a block.
 */
synchronized void cacheBlock(long blockId, String bpid,
    String blockFileName, long length, long genstamp,
    Executor volumeExecutor) {
  ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
  Value prevValue = mappableBlockMap.get(key);
  if (prevValue != null) {
    LOG.debug("Block with id {}, pool {} already exists in the "
            + "FsDatasetCache with state {}", blockId, bpid, prevValue.state
    );
    numBlocksFailedToCache.incrementAndGet();
    return;
  }
  mappableBlockMap.put(key, new Value(null, State.CACHING));
  volumeExecutor.execute(
      new CachingTask(key, blockFileName, length, genstamp));
  LOG.debug("Initiating caching for Block with id {}, pool {}", blockId,
      bpid);
}
项目:big-c    文件:FsDatasetImpl.java   
/**
 * Invalidate a block but does not delete the actual on-disk block file.
 *
 * It should only be used when deactivating disks.
 *
 * @param bpid the block pool ID.
 * @param block The block to be invalidated.
 */
public void invalidate(String bpid, ReplicaInfo block) {
  // If a DFSClient has the replica in its cache of short-circuit file
  // descriptors (and the client is using ShortCircuitShm), invalidate it.
  // The short-circuit registry is null in the unit tests, because the
  // datanode is mock object.
  if (datanode.getShortCircuitRegistry() != null) {
    datanode.getShortCircuitRegistry().processBlockInvalidation(
        new ExtendedBlockId(block.getBlockId(), bpid));

    // If the block is cached, start uncaching it.
    cacheManager.uncacheBlock(bpid, block.getBlockId());
  }

  datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
      block.getStorageUuid());
}
项目:big-c    文件:ShortCircuitRegistry.java   
/**
 * Invalidate any slot associated with a blockId that we are invalidating
 * (deleting) from this DataNode.  When a slot is invalid, the DFSClient will
 * not use the corresponding replica for new read or mmap operations (although
 * existing, ongoing read or mmap operations will complete.)
 *
 * @param blockId        The block ID.
 */
public synchronized void processBlockInvalidation(ExtendedBlockId blockId) {
  if (!enabled) return;
  final Set<Slot> affectedSlots = slots.get(blockId);
  if (!affectedSlots.isEmpty()) {
    final StringBuilder bld = new StringBuilder();
    String prefix = "";
    bld.append("Block ").append(blockId).append(" has been invalidated.  ").
        append("Marking short-circuit slots as invalid: ");
    for (Slot slot : affectedSlots) {
      slot.makeInvalid();
      bld.append(prefix).append(slot.toString());
      prefix = ", ";
    }
    LOG.info(bld.toString());
  }
}
项目:big-c    文件:TestEnhancedByteBufferAccess.java   
@Override
public void visit(int numOutstandingMmaps,
    Map<ExtendedBlockId, ShortCircuitReplica> replicas,
    Map<ExtendedBlockId, InvalidToken> failedLoads,
    Map<Long, ShortCircuitReplica> evictable,
    Map<Long, ShortCircuitReplica> evictableMmapped) {
  if (expectedNumOutstandingMmaps >= 0) {
    Assert.assertEquals(expectedNumOutstandingMmaps, numOutstandingMmaps);
  }
  if (expectedNumReplicas >= 0) {
    Assert.assertEquals(expectedNumReplicas, replicas.size());
  }
  if (expectedNumEvictable >= 0) {
    Assert.assertEquals(expectedNumEvictable, evictable.size());
  }
  if (expectedNumMmapedEvictable >= 0) {
    Assert.assertEquals(expectedNumMmapedEvictable, evictableMmapped.size());
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DfsClientShmManager.java   
/**
 * Pull a slot out of a preexisting shared memory segment.
 *
 * Must be called with the manager lock held.
 *
 * @param blockId     The blockId to put inside the Slot object.
 *
 * @return            null if none of our shared memory segments contain a
 *                      free slot; the slot object otherwise.
 */
private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) {
  if (notFull.isEmpty()) {
    return null;
  }
  Entry<ShmId, DfsClientShm> entry = notFull.firstEntry();
  DfsClientShm shm = entry.getValue();
  ShmId shmId = shm.getShmId();
  Slot slot = shm.allocAndRegisterSlot(blockId);
  if (shm.isFull()) {
    if (LOG.isTraceEnabled()) {
      LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() +
          " out of " + shm);
    }
    DfsClientShm removedShm = notFull.remove(shmId);
    Preconditions.checkState(removedShm == shm);
    full.put(shmId, shm);
  } else {
    if (LOG.isTraceEnabled()) {
      LOG.trace(this + ": pulled slot " + slot.getSlotIdx() +
          " out of " + shm);
    }
  }
  return slot;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DfsClientShmManager.java   
public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
    MutableBoolean usedPeer, ExtendedBlockId blockId,
    String clientName) throws IOException {
  lock.lock();
  try {
    if (closed) {
      LOG.trace(this + ": the DfsClientShmManager isclosed.");
      return null;
    }
    EndpointShmManager shmManager = datanodes.get(datanode);
    if (shmManager == null) {
      shmManager = new EndpointShmManager(datanode);
      datanodes.put(datanode, shmManager);
    }
    return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
  } finally {
    lock.unlock();
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:ShortCircuitShm.java   
/**
 * Allocate a new slot and register it.
 *
 * This function chooses an empty slot, initializes it, and then returns
 * the relevant Slot object.
 *
 * @return    The new slot.
 */
synchronized public final Slot allocAndRegisterSlot(
    ExtendedBlockId blockId) {
  int idx = allocatedSlots.nextClearBit(0);
  if (idx >= slots.length) {
    throw new RuntimeException(this + ": no more slots are available.");
  }
  allocatedSlots.set(idx, true);
  Slot slot = new Slot(calculateSlotAddress(idx), blockId);
  slot.clear();
  slot.makeValid();
  slots[idx] = slot;
  if (LOG.isTraceEnabled()) {
    LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots +
                StringUtils.getStackTrace(Thread.currentThread()));
  }
  return slot;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:ShortCircuitReplica.java   
public ShortCircuitReplica(ExtendedBlockId key,
    FileInputStream dataStream, FileInputStream metaStream,
    ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException {
  this.key = key;
  this.dataStream = dataStream;
  this.metaStream = metaStream;
  this.metaHeader =
        BlockMetadataHeader.preadHeader(metaStream.getChannel());
  if (metaHeader.getVersion() != 1) {
    throw new IOException("invalid metadata header version " +
        metaHeader.getVersion() + ".  Can only handle version 1.");
  }
  this.cache = cache;
  this.creationTimeMs = creationTimeMs;
  this.slot = slot;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:FsDatasetCache.java   
/**
 * Attempt to begin caching a block.
 */
synchronized void cacheBlock(long blockId, String bpid,
    String blockFileName, long length, long genstamp,
    Executor volumeExecutor) {
  ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
  Value prevValue = mappableBlockMap.get(key);
  if (prevValue != null) {
    LOG.debug("Block with id {}, pool {} already exists in the "
            + "FsDatasetCache with state {}", blockId, bpid, prevValue.state
    );
    numBlocksFailedToCache.incrementAndGet();
    return;
  }
  mappableBlockMap.put(key, new Value(null, State.CACHING));
  volumeExecutor.execute(
      new CachingTask(key, blockFileName, length, genstamp));
  LOG.debug("Initiating caching for Block with id {}, pool {}", blockId,
      bpid);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:FsDatasetImpl.java   
/**
 * Invalidate a block but does not delete the actual on-disk block file.
 *
 * It should only be used when deactivating disks.
 *
 * @param bpid the block pool ID.
 * @param block The block to be invalidated.
 */
public void invalidate(String bpid, ReplicaInfo block) {
  // If a DFSClient has the replica in its cache of short-circuit file
  // descriptors (and the client is using ShortCircuitShm), invalidate it.
  // The short-circuit registry is null in the unit tests, because the
  // datanode is mock object.
  if (datanode.getShortCircuitRegistry() != null) {
    datanode.getShortCircuitRegistry().processBlockInvalidation(
        new ExtendedBlockId(block.getBlockId(), bpid));

    // If the block is cached, start uncaching it.
    cacheManager.uncacheBlock(bpid, block.getBlockId());
  }

  datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
      block.getStorageUuid());
}
项目:hadoop-2.6.0-cdh5.4.3    文件:ShortCircuitRegistry.java   
/**
 * Invalidate any slot associated with a blockId that we are invalidating
 * (deleting) from this DataNode.  When a slot is invalid, the DFSClient will
 * not use the corresponding replica for new read or mmap operations (although
 * existing, ongoing read or mmap operations will complete.)
 *
 * @param blockId        The block ID.
 */
public synchronized void processBlockInvalidation(ExtendedBlockId blockId) {
  if (!enabled) return;
  final Set<Slot> affectedSlots = slots.get(blockId);
  if (!affectedSlots.isEmpty()) {
    final StringBuilder bld = new StringBuilder();
    String prefix = "";
    bld.append("Block ").append(blockId).append(" has been invalidated.  ").
        append("Marking short-circuit slots as invalid: ");
    for (Slot slot : affectedSlots) {
      slot.makeInvalid();
      bld.append(prefix).append(slot.toString());
      prefix = ", ";
    }
    LOG.info(bld.toString());
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestEnhancedByteBufferAccess.java   
@Override
public void visit(int numOutstandingMmaps,
    Map<ExtendedBlockId, ShortCircuitReplica> replicas,
    Map<ExtendedBlockId, InvalidToken> failedLoads,
    Map<Long, ShortCircuitReplica> evictable,
    Map<Long, ShortCircuitReplica> evictableMmapped) {
  if (expectedNumOutstandingMmaps >= 0) {
    Assert.assertEquals(expectedNumOutstandingMmaps, numOutstandingMmaps);
  }
  if (expectedNumReplicas >= 0) {
    Assert.assertEquals(expectedNumReplicas, replicas.size());
  }
  if (expectedNumEvictable >= 0) {
    Assert.assertEquals(expectedNumEvictable, evictable.size());
  }
  if (expectedNumMmapedEvictable >= 0) {
    Assert.assertEquals(expectedNumMmapedEvictable, evictableMmapped.size());
  }
}
项目:FlexMap    文件:DfsClientShmManager.java   
/**
 * Pull a slot out of a preexisting shared memory segment.
 *
 * Must be called with the manager lock held.
 *
 * @param blockId     The blockId to put inside the Slot object.
 *
 * @return            null if none of our shared memory segments contain a
 *                      free slot; the slot object otherwise.
 */
private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) {
  if (notFull.isEmpty()) {
    return null;
  }
  Entry<ShmId, DfsClientShm> entry = notFull.firstEntry();
  DfsClientShm shm = entry.getValue();
  ShmId shmId = shm.getShmId();
  Slot slot = shm.allocAndRegisterSlot(blockId);
  if (shm.isFull()) {
    if (LOG.isTraceEnabled()) {
      LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() +
          " out of " + shm);
    }
    DfsClientShm removedShm = notFull.remove(shmId);
    Preconditions.checkState(removedShm == shm);
    full.put(shmId, shm);
  } else {
    if (LOG.isTraceEnabled()) {
      LOG.trace(this + ": pulled slot " + slot.getSlotIdx() +
          " out of " + shm);
    }
  }
  return slot;
}
项目:FlexMap    文件:DfsClientShmManager.java   
public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
    MutableBoolean usedPeer, ExtendedBlockId blockId,
    String clientName) throws IOException {
  lock.lock();
  try {
    if (closed) {
      LOG.trace(this + ": the DfsClientShmManager isclosed.");
      return null;
    }
    EndpointShmManager shmManager = datanodes.get(datanode);
    if (shmManager == null) {
      shmManager = new EndpointShmManager(datanode);
      datanodes.put(datanode, shmManager);
    }
    return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
  } finally {
    lock.unlock();
  }
}
项目:FlexMap    文件:ShortCircuitShm.java   
/**
 * Allocate a new slot and register it.
 *
 * This function chooses an empty slot, initializes it, and then returns
 * the relevant Slot object.
 *
 * @return    The new slot.
 */
synchronized public final Slot allocAndRegisterSlot(
    ExtendedBlockId blockId) {
  int idx = allocatedSlots.nextClearBit(0);
  if (idx >= slots.length) {
    throw new RuntimeException(this + ": no more slots are available.");
  }
  allocatedSlots.set(idx, true);
  Slot slot = new Slot(calculateSlotAddress(idx), blockId);
  slot.clear();
  slot.makeValid();
  slots[idx] = slot;
  if (LOG.isTraceEnabled()) {
    LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots +
                StringUtils.getStackTrace(Thread.currentThread()));
  }
  return slot;
}
项目:FlexMap    文件:ShortCircuitReplica.java   
public ShortCircuitReplica(ExtendedBlockId key,
    FileInputStream dataStream, FileInputStream metaStream,
    ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException {
  this.key = key;
  this.dataStream = dataStream;
  this.metaStream = metaStream;
  this.metaHeader =
        BlockMetadataHeader.preadHeader(metaStream.getChannel());
  if (metaHeader.getVersion() != 1) {
    throw new IOException("invalid metadata header version " +
        metaHeader.getVersion() + ".  Can only handle version 1.");
  }
  this.cache = cache;
  this.creationTimeMs = creationTimeMs;
  this.slot = slot;
}
项目:FlexMap    文件:FsDatasetCache.java   
/**
 * Attempt to begin caching a block.
 */
synchronized void cacheBlock(long blockId, String bpid,
    String blockFileName, long length, long genstamp,
    Executor volumeExecutor) {
  ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
  Value prevValue = mappableBlockMap.get(key);
  if (prevValue != null) {
    LOG.debug("Block with id {}, pool {} already exists in the "
            + "FsDatasetCache with state {}", blockId, bpid, prevValue.state
    );
    numBlocksFailedToCache.incrementAndGet();
    return;
  }
  mappableBlockMap.put(key, new Value(null, State.CACHING));
  volumeExecutor.execute(
      new CachingTask(key, blockFileName, length, genstamp));
  LOG.debug("Initiating caching for Block with id {}, pool {}", blockId,
      bpid);
}
项目:FlexMap    文件:FsDatasetImpl.java   
/**
 * Invalidate a block but does not delete the actual on-disk block file.
 *
 * It should only be used when deactivating disks.
 *
 * @param bpid the block pool ID.
 * @param block The block to be invalidated.
 */
public void invalidate(String bpid, ReplicaInfo block) {
  // If a DFSClient has the replica in its cache of short-circuit file
  // descriptors (and the client is using ShortCircuitShm), invalidate it.
  // The short-circuit registry is null in the unit tests, because the
  // datanode is mock object.
  if (datanode.getShortCircuitRegistry() != null) {
    datanode.getShortCircuitRegistry().processBlockInvalidation(
        new ExtendedBlockId(block.getBlockId(), bpid));

    // If the block is cached, start uncaching it.
    cacheManager.uncacheBlock(bpid, block.getBlockId());
  }

  datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
      block.getStorageUuid());
}
项目:FlexMap    文件:ShortCircuitRegistry.java   
/**
 * Invalidate any slot associated with a blockId that we are invalidating
 * (deleting) from this DataNode.  When a slot is invalid, the DFSClient will
 * not use the corresponding replica for new read or mmap operations (although
 * existing, ongoing read or mmap operations will complete.)
 *
 * @param blockId        The block ID.
 */
public synchronized void processBlockInvalidation(ExtendedBlockId blockId) {
  if (!enabled) return;
  final Set<Slot> affectedSlots = slots.get(blockId);
  if (!affectedSlots.isEmpty()) {
    final StringBuilder bld = new StringBuilder();
    String prefix = "";
    bld.append("Block ").append(blockId).append(" has been invalidated.  ").
        append("Marking short-circuit slots as invalid: ");
    for (Slot slot : affectedSlots) {
      slot.makeInvalid();
      bld.append(prefix).append(slot.toString());
      prefix = ", ";
    }
    LOG.info(bld.toString());
  }
}
项目:FlexMap    文件:TestEnhancedByteBufferAccess.java   
@Override
public void visit(int numOutstandingMmaps,
    Map<ExtendedBlockId, ShortCircuitReplica> replicas,
    Map<ExtendedBlockId, InvalidToken> failedLoads,
    Map<Long, ShortCircuitReplica> evictable,
    Map<Long, ShortCircuitReplica> evictableMmapped) {
  if (expectedNumOutstandingMmaps >= 0) {
    Assert.assertEquals(expectedNumOutstandingMmaps, numOutstandingMmaps);
  }
  if (expectedNumReplicas >= 0) {
    Assert.assertEquals(expectedNumReplicas, replicas.size());
  }
  if (expectedNumEvictable >= 0) {
    Assert.assertEquals(expectedNumEvictable, evictable.size());
  }
  if (expectedNumMmapedEvictable >= 0) {
    Assert.assertEquals(expectedNumMmapedEvictable, evictableMmapped.size());
  }
}
项目:hadoop-on-lustre2    文件:DfsClientShmManager.java   
/**
 * Pull a slot out of a preexisting shared memory segment.
 *
 * Must be called with the manager lock held.
 *
 * @param blockId     The blockId to put inside the Slot object.
 *
 * @return            null if none of our shared memory segments contain a
 *                      free slot; the slot object otherwise.
 */
private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) {
  if (notFull.isEmpty()) {
    return null;
  }
  Entry<ShmId, DfsClientShm> entry = notFull.firstEntry();
  DfsClientShm shm = entry.getValue();
  ShmId shmId = shm.getShmId();
  Slot slot = shm.allocAndRegisterSlot(blockId);
  if (shm.isFull()) {
    if (LOG.isTraceEnabled()) {
      LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() +
          " out of " + shm);
    }
    DfsClientShm removedShm = notFull.remove(shmId);
    Preconditions.checkState(removedShm == shm);
    full.put(shmId, shm);
  } else {
    if (LOG.isTraceEnabled()) {
      LOG.trace(this + ": pulled slot " + slot.getSlotIdx() +
          " out of " + shm);
    }
  }
  return slot;
}