Java 类org.apache.hadoop.hdfs.server.datanode.FSDataset.ActiveFile 实例源码

项目:RDFS    文件:FSDataset.java   
/**
 * Recover blocks that were being written when the datanode
 * was earlier shut down. These blocks get re-inserted into
 * ongoingCreates. Also, send a blockreceived message to the NN
 * for each of these blocks because these are not part of a 
 * block report.
 */
private void recoverBlocksBeingWritten(File bbw) throws IOException {
  FSDir fsd = new FSDir(namespaceId, bbw, this.volume);
  LightWeightHashSet<BlockAndFile> blockSet = new LightWeightHashSet<BlockAndFile>();
  fsd.getBlockAndFileInfo(blockSet);
  for (BlockAndFile b : blockSet) {
    File f = b.pathfile;  // full path name of block file
    lock.writeLock().lock();
    try {
      volumeMap.add(namespaceId, b.block, new DatanodeBlockInfo(volume, f,
          DatanodeBlockInfo.UNFINALIZED));
      volumeMap.addOngoingCreates(namespaceId, b.block, ActiveFile.createStartupRecoveryFile(f));
    } finally {
      lock.writeLock().unlock();
    }
    if (DataNode.LOG.isDebugEnabled()) {
      DataNode.LOG.debug("recoverBlocksBeingWritten for block " + b.block + "namespaceId: "+namespaceId);
    }
  }
}
项目:RDFS    文件:FSDataset.java   
/** Return the block file for the given ID */ 
public File findBlockFile(int namespaceId, long blockId) {
  lock.readLock().lock();
  try {
    final Block eb = new Block(blockId);
    File blockfile = null;
    ActiveFile activefile = volumeMap.getOngoingCreates(namespaceId, eb);
    if (activefile != null) {
      blockfile = activefile.file;
    }
    if (blockfile == null) {
      blockfile = getFile(namespaceId, eb);
    }
    if (blockfile == null) {
      if (DataNode.LOG.isDebugEnabled()) {
        DataNode.LOG.debug("volumeMap=" + volumeMap);
      }
    }
    return blockfile;
  } finally {
    lock.readLock().unlock();
  }
}
项目:RDFS    文件:FSDataset.java   
/**
 * Return a list of active writer threads for the given block.
 * @return null if there are no such threads or the file is
 * not being created
 */
private ArrayList<Thread> getActiveThreads(int namespaceId, Block block) {
  lock.writeLock().lock();
  try {
    //check ongoing create threads
    final ActiveFile activefile = volumeMap.getOngoingCreates(namespaceId, block);
    if (activefile != null && !activefile.threads.isEmpty()) {
      //remove dead threads
      for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {
        final Thread t = i.next();
        if (!t.isAlive()) {
          i.remove();
        }
      }

      //return living threads
      if (!activefile.threads.isEmpty()) {
        return new ArrayList<Thread>(activefile.threads);
      }
    }
  } finally {
    lock.writeLock().unlock();
  }
  return null;
}
项目:RDFS    文件:FSDataset.java   
/**
 * Remove the temporary block file (if any)
 */
public void unfinalizeBlock(int namespaceId, Block b) throws IOException {
  lock.writeLock().lock();
  try {
    // remove the block from in-memory data structure
    ActiveFile activefile = volumeMap.removeOngoingCreates(namespaceId, b);
    if (activefile == null) {
      return;
    }
    volumeMap.remove(namespaceId, b);

    // delete the on-disk temp file
    if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
      DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
    }
  } finally {
    lock.writeLock().unlock();
  }
}
项目:hadoop-EAR    文件:VolumeMap.java   
ActiveFile getOngoingCreates(int namespaceId, Block block) {
  checkBlock(block);
  NamespaceMap nm = getNamespaceMap(namespaceId);
  if (nm == null) {
    return null;
  }
  return nm.getOngoingCreates(block);
}
项目:hadoop-EAR    文件:VolumeMap.java   
ActiveFile removeOngoingCreates(int namespaceId, Block block) { 
  checkBlock(block);
  NamespaceMap nm = getNamespaceMap(namespaceId);
  if (nm == null) {
    return null;
  }
  return nm.removeOngoingCreates(block);
}
项目:hadoop-EAR    文件:VolumeMap.java   
ActiveFile addOngoingCreates(int namespaceId, Block block, ActiveFile af) {
  checkBlock(block);
  NamespaceMap nm = getNamespaceMap(namespaceId);
  if (nm == null) {
    return null;
  }
  return nm.addOngoingCreates(block, af);
}
项目:hadoop-EAR    文件:NamespaceMap.java   
NamespaceMap(int namespaceId) {
  numBucket = NUM_BUCKETS;
  this.namespaceId = namespaceId;
  ongoingCreates = new ConcurrentHashMap<Block, ActiveFile>();
  blockBuckets = new BlockBucket[numBucket];
  for (int i = 0; i < numBucket; i++) {
    blockBuckets[i] = new BlockBucket(i);
  }
}
项目:RDFS    文件:FSDataset.java   
ActiveFile(File f, List<Thread> list, long expectedSize) throws IOException {
  this(f, false, expectedSize);
  if (list != null) {
    threads.addAll(list);
  }
  threads.add(Thread.currentThread());
}
项目:RDFS    文件:FSDataset.java   
private ActiveFile(File f, boolean recovery, long expectedSize)
    throws IOException {
  file = f;
  long fileLength = f.length();
  if (expectedSize != UNKNOWN_SIZE && fileLength != expectedSize) {
    throw new IOException("File " + f + " on disk size " + fileLength
        + " doesn't match expected size " + expectedSize);
  }
 bytesAcked = bytesOnDisk = fileLength;
  wasRecoveredOnStartup = recovery;
}
项目:RDFS    文件:FSDataset.java   
@Override
public long getOnDiskLength(int namespaceId, Block b) throws IOException {
  ActiveFile activeFile = volumeMap.getOngoingCreates(namespaceId, b);

  if (activeFile != null) {
    return activeFile.getBytesOnDisk();
  } else {
    return getFinalizedBlockLength(namespaceId, b);
  }
}
项目:RDFS    文件:FSDataset.java   
@Override
public long getVisibleLength(int namespaceId, Block b) throws IOException {
  ActiveFile activeFile = volumeMap.getOngoingCreates(namespaceId, b);

  if (activeFile != null) {
    return activeFile.getBytesAcked();
  } else {
    return getFinalizedBlockLength(namespaceId, b);
  }
}
项目:RDFS    文件:FSDataset.java   
/**
 * Complete the block write!
 */
public void finalizeBlockInternal(int namespaceId, Block b, boolean reFinalizeOk)
  throws IOException {
  lock.writeLock().lock();
  DatanodeBlockInfo replicaInfo = volumeMap.get(namespaceId, b);
  try {
    ActiveFile activeFile = volumeMap.getOngoingCreates(namespaceId, b);
    if (activeFile == null) {
      if (reFinalizeOk) {
        return;
      } else {
        throw new IOException("Block " + b + " is already finalized.");
      }
    }
    File f = activeFile.file;
    if (f == null || !f.exists()) {
      throw new IOException("No temporary file " + f + " for block " + b);
    }
    FSVolume v = replicaInfo.getVolume();
    if (v == null) {
      throw new IOException("No volume for temporary file " + f + 
                            " for block " + b);
    }

    File dest = null;
    dest = v.addBlock(namespaceId, b, f);
    volumeMap.add(namespaceId, b,
        new DatanodeBlockInfo(v, dest, activeFile.getBytesOnDisk()));
    volumeMap.removeOngoingCreates(namespaceId, b);
  } finally {
    lock.writeLock().unlock();
  }
}
项目:RDFS    文件:FSDataset.java   
private boolean isBlockFinalizedInternal(int namespaceId, Block b,
    boolean validate) {
  DatanodeBlockInfo blockInfo = volumeMap.get(namespaceId, b);

  // We skip the check for validate case to avoid redundant codes
  // but keep old codes' behavior. Though it looks like a bug, but we
  // would fix it in a separate patch.
  // 
  if (!validate && blockInfo == null) {
    return false; // block is not finalized
  }
  FSVolume v = blockInfo.getVolume();
  if (v == null) {
    DataNode.LOG.warn("No volume for block " + b);
    return false; // block is not finalized
  }
  ActiveFile activeFile = volumeMap.getOngoingCreates(namespaceId, b);
  if (activeFile != null) {
    if (validate) {
      File f = activeFile.file;
      if (f == null || !f.exists()) {
        // we should never get into this position.
        DataNode.LOG.warn("No temporary file " + f + " for block " + b);
      }
    }
    return false; // block is not finalized
  }
  return true; // block is finalized
}
项目:RDFS    文件:VolumeMap.java   
synchronized void initNamespace(int namespaceId) {
  Map<Block, DatanodeBlockInfo> m = namespaceMap.get(namespaceId);
  if(m != null){
    return; 
  }
  m = new HashMap<Block, DatanodeBlockInfo>();
  namespaceMap.put(namespaceId, m);    
  Map<Block, ActiveFile> oc = new HashMap<Block, ActiveFile>();
  ongoingCreates.put(namespaceId, oc);
}
项目:RDFS    文件:VolumeMap.java   
ActiveFile getOngoingCreates(int namespaceId, Block block) {
  checkBlock(block);
  synchronized(this){
    Map<Block, ActiveFile> m = ongoingCreates.get(namespaceId);
    return m != null ? m.get(block) : null;
  }
}
项目:RDFS    文件:VolumeMap.java   
ActiveFile removeOngoingCreates(int namespaceId, Block block) { 
  checkBlock(block);
  synchronized(this){
    Map<Block, ActiveFile> m = ongoingCreates.get(namespaceId);
    return m != null ? m.remove(block) : null;
  }
}
项目:RDFS    文件:VolumeMap.java   
ActiveFile addOngoingCreates(int namespaceId, Block block, ActiveFile af) {
  checkBlock(block);
  synchronized(this){
    Map<Block, ActiveFile> m = ongoingCreates.get(namespaceId);
    return m.put(block, af);
  }
}
项目:RDFS    文件:VolumeMap.java   
/**
 * If there is an ActiveFile object for the block, create a copy of the
 * old one and replace the old one. This is to make sure that the VisibleLength
 * applied to the old object will have no impact to the local map. In
 * that way, BlockReceiver can directly update visible length without
 * holding the lock.
 * 
 * @param namespaceId
 * @param block
 * @throws CloneNotSupportedException 
 */
void copyOngoingCreates(int namespaceId, Block block) throws CloneNotSupportedException {
  checkBlock(block);
  synchronized(this){
    Map<Block, ActiveFile> m = ongoingCreates.get(namespaceId);
    ActiveFile af = m.get(block);
    if (af == null) {
      return;
    }

    m.put(block, af.getClone());
  }
}
项目:hadoop-EAR    文件:NamespaceMap.java   
ActiveFile getOngoingCreates(Block block) {
  return ongoingCreates.get(block);
}
项目:hadoop-EAR    文件:NamespaceMap.java   
ActiveFile removeOngoingCreates(Block block) { 
  return ongoingCreates.remove(block);
}
项目:hadoop-EAR    文件:NamespaceMap.java   
ActiveFile addOngoingCreates(Block block, ActiveFile af) {
  return ongoingCreates.put(block, af);
}
项目:hadoop-on-lustre    文件:TestBlockReportGeneration.java   
private void fakeBeingCreated(Block b) {
  ongoingCreates.put(b,
      new ActiveFile(blockFile(b), new ArrayList<Thread>()));
}
项目:RDFS    文件:FSDataset.java   
ActiveFile(File f, List<Thread> list) throws IOException {
  this(f, list, UNKNOWN_SIZE);
}
项目:RDFS    文件:FSDataset.java   
public ActiveFile getClone() throws CloneNotSupportedException {
  return (ActiveFile) super.clone();
}
项目:RDFS    文件:FSDataset.java   
@Override
public BlockRecoveryInfo startBlockRecovery(int namespaceId, long blockId)
    throws IOException {
  Block stored = getStoredBlock(namespaceId, blockId, true);

  if (stored == null) {
    return null;
  }

  // It's important that this loop not be synchronized - otherwise
  // this will deadlock against the thread it's joining against!
  while (true) {
    DataNode.LOG.debug(
        "Interrupting active writer threads for block " + stored);
    List<Thread> activeThreads = getActiveThreads(namespaceId, stored);
    if (activeThreads == null) break;
    if (interruptAndJoinThreads(activeThreads))
      break;
  }

  lock.readLock().lock();
  try {
    // now that writers are stopped, re-fetch the block's meta info
    stored = getStoredBlock(namespaceId, blockId, true);

    if (stored == null) {
      return null;
    }

    ActiveFile activeFile = volumeMap.getOngoingCreates(namespaceId, stored);
    boolean isRecovery = (activeFile != null) && activeFile.wasRecoveredOnStartup;


    BlockRecoveryInfo info = new BlockRecoveryInfo(stored, isRecovery);
    if (DataNode.LOG.isDebugEnabled()) {
      DataNode.LOG.debug("getBlockMetaDataInfo successful block=" + stored +
                " length " + stored.getNumBytes() +
                " genstamp " + stored.getGenerationStamp());
    }

    // paranoia! verify that the contents of the stored block
    // matches the block file on disk.
    validateBlockMetadata(namespaceId, stored);
    return info;
  } finally {
    lock.readLock().unlock();
  }
}
项目:RDFS    文件:VolumeMap.java   
VolumeMap(int numNamespaces) {
  namespaceMap = new HashMap<Integer, Map<Block, DatanodeBlockInfo>>(numNamespaces);
  ongoingCreates = new HashMap<Integer, Map<Block, ActiveFile>>(numNamespaces);
}
项目:hortonworks-extension    文件:TestBlockReportGeneration.java   
private void fakeBeingCreated(Block b) {
  ongoingCreates.put(b,
      new ActiveFile(blockFile(b), new ArrayList<Thread>()));
}
项目:hortonworks-extension    文件:TestBlockReportGeneration.java   
private void fakeBeingCreated(Block b) {
  ongoingCreates.put(b,
      new ActiveFile(blockFile(b), new ArrayList<Thread>()));
}
项目:hadoop-EAR    文件:NamespaceMap.java   
/**
 * If there is an ActiveFile object for the block, create a copy of the
 * old one and replace the old one. This is to make sure that the VisibleLength
 * applied to the old object will have no impact to the local map. In
 * that way, BlockReceiver can directly update visible length without
 * holding the lock.
 * 
 * @param block
 * @throws CloneNotSupportedException 
 */
void copyOngoingCreates(Block block) throws CloneNotSupportedException {
  ActiveFile af = ongoingCreates.get(block);
  if (af == null) {
    return;
  }

  ongoingCreates.put(block, af.getClone());
}
项目:RDFS    文件:FSDataset.java   
/**
 * Adds a file to the ongoingCreates datastructure to indicate we are creating
 * a file.
 * 
 * @param dstNamespaceId
 *          namespace id for dstBlock
 * @param dstBlock
 *          the block that we are going to create
 * @param dstVol
 *          the volume on which the file is to be created.
 * @return the temporary file for the block
 * @throws IOException
 */
private File addToOngoingCreates(int dstNamespaceId, Block dstBlock,
    FSVolume dstVol) throws IOException {
  List<Thread> threads = null;
  // We do not want to create a BBW, hence treat this as a replication
  // request.
  File dstBlockFile = createTmpFile(dstNamespaceId, dstVol, dstBlock, true);
  volumeMap.addOngoingCreates(dstNamespaceId, dstBlock,
      new ActiveFile(dstBlockFile, threads));
  return dstBlockFile;
}
项目:RDFS    文件:FSDataset.java   
/**
 * Create an ActiveFile from a file on disk during DataNode startup.
 * This factory method is just to make it clear when the purpose
 * of this constructor is.
 * @throws IOException 
 */
public static ActiveFile createStartupRecoveryFile(File f)
    throws IOException {
  return new ActiveFile(f, true, UNKNOWN_SIZE);
}