Java 类org.apache.hadoop.hdfs.server.datanode.ReplicaHandler 实例源码

项目:hadoop    文件:FsDatasetImpl.java   
@Override  // FsDatasetSpi
public synchronized ReplicaHandler append(ExtendedBlock b,
    long newGS, long expectedBlockLen) throws IOException {
  // If the block was successfully finalized because all packets
  // were successfully processed at the Datanode but the ack for
  // some of the packets were not received by the client. The client 
  // re-opens the connection and retries sending those packets.
  // The other reason is that an "append" is occurring to this block.

  // check the validity of the parameter
  if (newGS < b.getGenerationStamp()) {
    throw new IOException("The new generation stamp " + newGS + 
        " should be greater than the replica " + b + "'s generation stamp");
  }
  ReplicaInfo replicaInfo = getReplicaInfo(b);
  LOG.info("Appending to " + replicaInfo);
  if (replicaInfo.getState() != ReplicaState.FINALIZED) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
  }
  if (replicaInfo.getNumBytes() != expectedBlockLen) {
    throw new IOException("Corrupted replica " + replicaInfo + 
        " with a length of " + replicaInfo.getNumBytes() + 
        " expected length is " + expectedBlockLen);
  }

  FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
  ReplicaBeingWritten replica = null;
  try {
    replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
        b.getNumBytes());
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }
  return new ReplicaHandler(replica, ref);
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImpl.java   
@Override  // FsDatasetSpi
public synchronized ReplicaHandler append(ExtendedBlock b,
    long newGS, long expectedBlockLen) throws IOException {
  // If the block was successfully finalized because all packets
  // were successfully processed at the Datanode but the ack for
  // some of the packets were not received by the client. The client 
  // re-opens the connection and retries sending those packets.
  // The other reason is that an "append" is occurring to this block.

  // check the validity of the parameter
  if (newGS < b.getGenerationStamp()) {
    throw new IOException("The new generation stamp " + newGS + 
        " should be greater than the replica " + b + "'s generation stamp");
  }
  ReplicaInfo replicaInfo = getReplicaInfo(b);
  LOG.info("Appending to " + replicaInfo);
  if (replicaInfo.getState() != ReplicaState.FINALIZED) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
  }
  if (replicaInfo.getNumBytes() != expectedBlockLen) {
    throw new IOException("Corrupted replica " + replicaInfo + 
        " with a length of " + replicaInfo.getNumBytes() + 
        " expected length is " + expectedBlockLen);
  }

  FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
  ReplicaBeingWritten replica = null;
  try {
    replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
        b.getNumBytes());
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }
  return new ReplicaHandler(replica, ref);
}
项目:big-c    文件:FsDatasetImpl.java   
@Override  // FsDatasetSpi
public synchronized ReplicaHandler append(ExtendedBlock b,
    long newGS, long expectedBlockLen) throws IOException {
  // If the block was successfully finalized because all packets
  // were successfully processed at the Datanode but the ack for
  // some of the packets were not received by the client. The client 
  // re-opens the connection and retries sending those packets.
  // The other reason is that an "append" is occurring to this block.

  // check the validity of the parameter
  if (newGS < b.getGenerationStamp()) {
    throw new IOException("The new generation stamp " + newGS + 
        " should be greater than the replica " + b + "'s generation stamp");
  }
  ReplicaInfo replicaInfo = getReplicaInfo(b);
  LOG.info("Appending to " + replicaInfo);
  if (replicaInfo.getState() != ReplicaState.FINALIZED) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
  }
  if (replicaInfo.getNumBytes() != expectedBlockLen) {
    throw new IOException("Corrupted replica " + replicaInfo + 
        " with a length of " + replicaInfo.getNumBytes() + 
        " expected length is " + expectedBlockLen);
  }

  FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
  ReplicaBeingWritten replica = null;
  try {
    replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
        b.getNumBytes());
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }
  return new ReplicaHandler(replica, ref);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:FsDatasetImpl.java   
@Override  // FsDatasetSpi
public synchronized ReplicaHandler append(ExtendedBlock b,
    long newGS, long expectedBlockLen) throws IOException {
  // If the block was successfully finalized because all packets
  // were successfully processed at the Datanode but the ack for
  // some of the packets were not received by the client. The client 
  // re-opens the connection and retries sending those packets.
  // The other reason is that an "append" is occurring to this block.

  // check the validity of the parameter
  if (newGS < b.getGenerationStamp()) {
    throw new IOException("The new generation stamp " + newGS + 
        " should be greater than the replica " + b + "'s generation stamp");
  }
  ReplicaInfo replicaInfo = getReplicaInfo(b);
  LOG.info("Appending to " + replicaInfo);
  if (replicaInfo.getState() != ReplicaState.FINALIZED) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
  }
  if (replicaInfo.getNumBytes() != expectedBlockLen) {
    throw new IOException("Corrupted replica " + replicaInfo + 
        " with a length of " + replicaInfo.getNumBytes() + 
        " expected length is " + expectedBlockLen);
  }

  FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
  ReplicaBeingWritten replica = null;
  try {
    replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
        b.getNumBytes());
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }
  return new ReplicaHandler(replica, ref);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public synchronized ReplicaHandler createTemporary(
    StorageType storageType, ExtendedBlock b) throws IOException {
  ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
  if (replicaInfo != null) {
    if (replicaInfo.getGenerationStamp() < b.getGenerationStamp()
        && replicaInfo instanceof ReplicaInPipeline) {
      // Stop the previous writer
      ((ReplicaInPipeline)replicaInfo)
                    .stopWriter(datanode.getDnConf().getXceiverStopTimeout());
      invalidate(b.getBlockPoolId(), new Block[]{replicaInfo});
    } else {
      throw new ReplicaAlreadyExistsException("Block " + b +
          " already exists in state " + replicaInfo.getState() +
          " and thus cannot be created.");
    }
  }

  FsVolumeReference ref = volumes.getNextVolume(storageType, b.getNumBytes());
  FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
  // create a temporary file to hold block in the designated volume
  File f;
  try {
    f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }

  ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), 
      b.getGenerationStamp(), v, f.getParentFile(), 0);
  volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
  return new ReplicaHandler(newReplicaInfo, ref);
}
项目:hadoop    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public synchronized ReplicaHandler createRbw(
    StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
    throws IOException {
  ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
      b.getBlockId());
  if (replicaInfo != null) {
    throw new ReplicaAlreadyExistsException("Block " + b +
    " already exists in state " + replicaInfo.getState() +
    " and thus cannot be created.");
  }
  // create a new block
  FsVolumeReference ref;
  while (true) {
    try {
      if (allowLazyPersist) {
        // First try to place the block on a transient volume.
        ref = volumes.getNextTransientVolume(b.getNumBytes());
        datanode.getMetrics().incrRamDiskBlocksWrite();
      } else {
        ref = volumes.getNextVolume(storageType, b.getNumBytes());
      }
    } catch (DiskOutOfSpaceException de) {
      if (allowLazyPersist) {
        datanode.getMetrics().incrRamDiskBlocksWriteFallback();
        allowLazyPersist = false;
        continue;
      }
      throw de;
    }
    break;
  }
  FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
  // create an rbw file to hold block in the designated volume
  File f;
  try {
    f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }

  ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
      b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
  volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
  return new ReplicaHandler(newReplicaInfo, ref);
}
项目:hadoop    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public synchronized ReplicaHandler recoverRbw(
    ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
    throws IOException {
  LOG.info("Recover RBW replica " + b);

  ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());

  // check the replica's state
  if (replicaInfo.getState() != ReplicaState.RBW) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
  }
  ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;

  LOG.info("Recovering " + rbw);

  // Stop the previous writer
  rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
  rbw.setWriter(Thread.currentThread());

  // check generation stamp
  long replicaGenerationStamp = rbw.getGenerationStamp();
  if (replicaGenerationStamp < b.getGenerationStamp() ||
      replicaGenerationStamp > newGS) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
        ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
        newGS + "].");
  }

  // check replica length
  long bytesAcked = rbw.getBytesAcked();
  long numBytes = rbw.getNumBytes();
  if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
    throw new ReplicaNotFoundException("Unmatched length replica " + 
        replicaInfo + ": BytesAcked = " + bytesAcked + 
        " BytesRcvd = " + numBytes + " are not in the range of [" + 
        minBytesRcvd + ", " + maxBytesRcvd + "].");
  }

  FsVolumeReference ref = rbw.getVolume().obtainReference();
  try {
    // Truncate the potentially corrupt portion.
    // If the source was client and the last node in the pipeline was lost,
    // any corrupt data written after the acked length can go unnoticed.
    if (numBytes > bytesAcked) {
      final File replicafile = rbw.getBlockFile();
      truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
      rbw.setNumBytes(bytesAcked);
      rbw.setLastChecksumAndDataLen(bytesAcked, null);
    }

    // bump the replica's generation stamp to newGS
    bumpReplicaGS(rbw, newGS);
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }
  return new ReplicaHandler(rbw, ref);
}
项目:hadoop    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public ReplicaHandler createTemporary(
    StorageType storageType, ExtendedBlock b) throws IOException {
  long startTimeMs = Time.monotonicNow();
  long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
  ReplicaInfo lastFoundReplicaInfo = null;
  do {
    synchronized (this) {
      ReplicaInfo currentReplicaInfo =
          volumeMap.get(b.getBlockPoolId(), b.getBlockId());
      if (currentReplicaInfo == lastFoundReplicaInfo) {
        if (lastFoundReplicaInfo != null) {
          invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
        }
        FsVolumeReference ref =
            volumes.getNextVolume(storageType, b.getNumBytes());
        FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
        // create a temporary file to hold block in the designated volume
        File f;
        try {
          f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
        } catch (IOException e) {
          IOUtils.cleanup(null, ref);
          throw e;
        }
        ReplicaInPipeline newReplicaInfo =
            new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
                f.getParentFile(), 0);
        volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
        return new ReplicaHandler(newReplicaInfo, ref);
      } else {
        if (!(currentReplicaInfo.getGenerationStamp() < b
            .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) {
          throw new ReplicaAlreadyExistsException("Block " + b
              + " already exists in state " + currentReplicaInfo.getState()
              + " and thus cannot be created.");
        }
        lastFoundReplicaInfo = currentReplicaInfo;
      }
    }

    // Hang too long, just bail out. This is not supposed to happen.
    long writerStopMs = Time.monotonicNow() - startTimeMs;
    if (writerStopMs > writerStopTimeoutMs) {
      LOG.warn("Unable to stop existing writer for block " + b + " after " 
          + writerStopMs + " miniseconds.");
      throw new IOException("Unable to stop existing writer for block " + b
          + " after " + writerStopMs + " miniseconds.");
    }

    // Stop the previous writer
    ((ReplicaInPipeline) lastFoundReplicaInfo)
        .stopWriter(writerStopTimeoutMs);
  } while (true);
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public synchronized ReplicaHandler createRbw(
    StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
    throws IOException {
  ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
      b.getBlockId());
  if (replicaInfo != null) {
    throw new ReplicaAlreadyExistsException("Block " + b +
    " already exists in state " + replicaInfo.getState() +
    " and thus cannot be created.");
  }
  // create a new block
  FsVolumeReference ref = null;

  // Use ramdisk only if block size is a multiple of OS page size.
  // This simplifies reservation for partially used replicas
  // significantly.
  if (allowLazyPersist &&
      lazyWriter != null &&
      b.getNumBytes() % cacheManager.getOsPageSize() == 0 &&
      reserveLockedMemory(b.getNumBytes())) {
    try {
      // First try to place the block on a transient volume.
      ref = volumes.getNextTransientVolume(b.getNumBytes());
      datanode.getMetrics().incrRamDiskBlocksWrite();
    } catch(DiskOutOfSpaceException de) {
      // Ignore the exception since we just fall back to persistent storage.
    } finally {
      if (ref == null) {
        cacheManager.release(b.getNumBytes());
      }
    }
  }

  if (ref == null) {
    ref = volumes.getNextVolume(storageType, b.getNumBytes());
  }

  FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
  // create an rbw file to hold block in the designated volume

  if (allowLazyPersist && !v.isTransientStorage()) {
    datanode.getMetrics().incrRamDiskBlocksWriteFallback();
  }

  File f;
  try {
    f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }

  ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
      b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
  volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
  return new ReplicaHandler(newReplicaInfo, ref);
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public synchronized ReplicaHandler recoverRbw(
    ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
    throws IOException {
  LOG.info("Recover RBW replica " + b);

  ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());

  // check the replica's state
  if (replicaInfo.getState() != ReplicaState.RBW) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
  }
  ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;

  LOG.info("Recovering " + rbw);

  // Stop the previous writer
  rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
  rbw.setWriter(Thread.currentThread());

  // check generation stamp
  long replicaGenerationStamp = rbw.getGenerationStamp();
  if (replicaGenerationStamp < b.getGenerationStamp() ||
      replicaGenerationStamp > newGS) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
        ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
        newGS + "].");
  }

  // check replica length
  long bytesAcked = rbw.getBytesAcked();
  long numBytes = rbw.getNumBytes();
  if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
    throw new ReplicaNotFoundException("Unmatched length replica " + 
        replicaInfo + ": BytesAcked = " + bytesAcked + 
        " BytesRcvd = " + numBytes + " are not in the range of [" + 
        minBytesRcvd + ", " + maxBytesRcvd + "].");
  }

  FsVolumeReference ref = rbw.getVolume().obtainReference();
  try {
    // Truncate the potentially corrupt portion.
    // If the source was client and the last node in the pipeline was lost,
    // any corrupt data written after the acked length can go unnoticed.
    if (numBytes > bytesAcked) {
      final File replicafile = rbw.getBlockFile();
      truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
      rbw.setNumBytes(bytesAcked);
      rbw.setLastChecksumAndDataLen(bytesAcked, null);
    }

    // bump the replica's generation stamp to newGS
    bumpReplicaGS(rbw, newGS);
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }
  return new ReplicaHandler(rbw, ref);
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public ReplicaHandler createTemporary(
    StorageType storageType, ExtendedBlock b) throws IOException {
  long startTimeMs = Time.monotonicNow();
  long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
  ReplicaInfo lastFoundReplicaInfo = null;
  do {
    synchronized (this) {
      ReplicaInfo currentReplicaInfo =
          volumeMap.get(b.getBlockPoolId(), b.getBlockId());
      if (currentReplicaInfo == lastFoundReplicaInfo) {
        if (lastFoundReplicaInfo != null) {
          invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
        }
        FsVolumeReference ref =
            volumes.getNextVolume(storageType, b.getNumBytes());
        FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
        // create a temporary file to hold block in the designated volume
        File f;
        try {
          f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
        } catch (IOException e) {
          IOUtils.cleanup(null, ref);
          throw e;
        }
        ReplicaInPipeline newReplicaInfo =
            new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
                f.getParentFile(), b.getLocalBlock().getNumBytes());
        volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
        return new ReplicaHandler(newReplicaInfo, ref);
      } else {
        if (!(currentReplicaInfo.getGenerationStamp() < b
            .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) {
          throw new ReplicaAlreadyExistsException("Block " + b
              + " already exists in state " + currentReplicaInfo.getState()
              + " and thus cannot be created.");
        }
        lastFoundReplicaInfo = currentReplicaInfo;
      }
    }

    // Hang too long, just bail out. This is not supposed to happen.
    long writerStopMs = Time.monotonicNow() - startTimeMs;
    if (writerStopMs > writerStopTimeoutMs) {
      LOG.warn("Unable to stop existing writer for block " + b + " after " 
          + writerStopMs + " miniseconds.");
      throw new IOException("Unable to stop existing writer for block " + b
          + " after " + writerStopMs + " miniseconds.");
    }

    // Stop the previous writer
    ((ReplicaInPipeline) lastFoundReplicaInfo)
        .stopWriter(writerStopTimeoutMs);
  } while (true);
}
项目:big-c    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public synchronized ReplicaHandler createRbw(
    StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
    throws IOException {
  ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
      b.getBlockId());
  if (replicaInfo != null) {
    throw new ReplicaAlreadyExistsException("Block " + b +
    " already exists in state " + replicaInfo.getState() +
    " and thus cannot be created.");
  }
  // create a new block
  FsVolumeReference ref;
  while (true) {
    try {
      if (allowLazyPersist) {
        // First try to place the block on a transient volume.
        ref = volumes.getNextTransientVolume(b.getNumBytes());
        datanode.getMetrics().incrRamDiskBlocksWrite();
      } else {
        ref = volumes.getNextVolume(storageType, b.getNumBytes());
      }
    } catch (DiskOutOfSpaceException de) {
      if (allowLazyPersist) {
        datanode.getMetrics().incrRamDiskBlocksWriteFallback();
        allowLazyPersist = false;
        continue;
      }
      throw de;
    }
    break;
  }
  FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
  // create an rbw file to hold block in the designated volume
  File f;
  try {
    f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }

  ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
      b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
  volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
  return new ReplicaHandler(newReplicaInfo, ref);
}
项目:big-c    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public synchronized ReplicaHandler recoverRbw(
    ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
    throws IOException {
  LOG.info("Recover RBW replica " + b);

  ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());

  // check the replica's state
  if (replicaInfo.getState() != ReplicaState.RBW) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
  }
  ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;

  LOG.info("Recovering " + rbw);

  // Stop the previous writer
  rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
  rbw.setWriter(Thread.currentThread());

  // check generation stamp
  long replicaGenerationStamp = rbw.getGenerationStamp();
  if (replicaGenerationStamp < b.getGenerationStamp() ||
      replicaGenerationStamp > newGS) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
        ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
        newGS + "].");
  }

  // check replica length
  long bytesAcked = rbw.getBytesAcked();
  long numBytes = rbw.getNumBytes();
  if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
    throw new ReplicaNotFoundException("Unmatched length replica " + 
        replicaInfo + ": BytesAcked = " + bytesAcked + 
        " BytesRcvd = " + numBytes + " are not in the range of [" + 
        minBytesRcvd + ", " + maxBytesRcvd + "].");
  }

  FsVolumeReference ref = rbw.getVolume().obtainReference();
  try {
    // Truncate the potentially corrupt portion.
    // If the source was client and the last node in the pipeline was lost,
    // any corrupt data written after the acked length can go unnoticed.
    if (numBytes > bytesAcked) {
      final File replicafile = rbw.getBlockFile();
      truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
      rbw.setNumBytes(bytesAcked);
      rbw.setLastChecksumAndDataLen(bytesAcked, null);
    }

    // bump the replica's generation stamp to newGS
    bumpReplicaGS(rbw, newGS);
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }
  return new ReplicaHandler(rbw, ref);
}
项目:big-c    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public ReplicaHandler createTemporary(
    StorageType storageType, ExtendedBlock b) throws IOException {
  long startTimeMs = Time.monotonicNow();
  long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
  ReplicaInfo lastFoundReplicaInfo = null;
  do {
    synchronized (this) {
      ReplicaInfo currentReplicaInfo =
          volumeMap.get(b.getBlockPoolId(), b.getBlockId());
      if (currentReplicaInfo == lastFoundReplicaInfo) {
        if (lastFoundReplicaInfo != null) {
          invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
        }
        FsVolumeReference ref =
            volumes.getNextVolume(storageType, b.getNumBytes());
        FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
        // create a temporary file to hold block in the designated volume
        File f;
        try {
          f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
        } catch (IOException e) {
          IOUtils.cleanup(null, ref);
          throw e;
        }
        ReplicaInPipeline newReplicaInfo =
            new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
                f.getParentFile(), 0);
        volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
        return new ReplicaHandler(newReplicaInfo, ref);
      } else {
        if (!(currentReplicaInfo.getGenerationStamp() < b
            .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) {
          throw new ReplicaAlreadyExistsException("Block " + b
              + " already exists in state " + currentReplicaInfo.getState()
              + " and thus cannot be created.");
        }
        lastFoundReplicaInfo = currentReplicaInfo;
      }
    }

    // Hang too long, just bail out. This is not supposed to happen.
    long writerStopMs = Time.monotonicNow() - startTimeMs;
    if (writerStopMs > writerStopTimeoutMs) {
      LOG.warn("Unable to stop existing writer for block " + b + " after " 
          + writerStopMs + " miniseconds.");
      throw new IOException("Unable to stop existing writer for block " + b
          + " after " + writerStopMs + " miniseconds.");
    }

    // Stop the previous writer
    ((ReplicaInPipeline) lastFoundReplicaInfo)
        .stopWriter(writerStopTimeoutMs);
  } while (true);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public synchronized ReplicaHandler createRbw(
    StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
    throws IOException {
  ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
      b.getBlockId());
  if (replicaInfo != null) {
    throw new ReplicaAlreadyExistsException("Block " + b +
    " already exists in state " + replicaInfo.getState() +
    " and thus cannot be created.");
  }
  // create a new block
  FsVolumeReference ref;
  while (true) {
    try {
      if (allowLazyPersist) {
        // First try to place the block on a transient volume.
        ref = volumes.getNextTransientVolume(b.getNumBytes());
        datanode.getMetrics().incrRamDiskBlocksWrite();
      } else {
        ref = volumes.getNextVolume(storageType, b.getNumBytes());
      }
    } catch (DiskOutOfSpaceException de) {
      if (allowLazyPersist) {
        datanode.getMetrics().incrRamDiskBlocksWriteFallback();
        allowLazyPersist = false;
        continue;
      }
      throw de;
    }
    break;
  }
  FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
  // create an rbw file to hold block in the designated volume
  File f;
  try {
    f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }

  ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
      b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
  volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
  return new ReplicaHandler(newReplicaInfo, ref);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public synchronized ReplicaHandler recoverRbw(
    ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
    throws IOException {
  LOG.info("Recover RBW replica " + b);

  ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());

  // check the replica's state
  if (replicaInfo.getState() != ReplicaState.RBW) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
  }
  ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;

  LOG.info("Recovering " + rbw);

  // Stop the previous writer
  rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
  rbw.setWriter(Thread.currentThread());

  // check generation stamp
  long replicaGenerationStamp = rbw.getGenerationStamp();
  if (replicaGenerationStamp < b.getGenerationStamp() ||
      replicaGenerationStamp > newGS) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
        ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
        newGS + "].");
  }

  // check replica length
  long bytesAcked = rbw.getBytesAcked();
  long numBytes = rbw.getNumBytes();
  if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
    throw new ReplicaNotFoundException("Unmatched length replica " + 
        replicaInfo + ": BytesAcked = " + bytesAcked + 
        " BytesRcvd = " + numBytes + " are not in the range of [" + 
        minBytesRcvd + ", " + maxBytesRcvd + "].");
  }

  FsVolumeReference ref = rbw.getVolume().obtainReference();
  try {
    // Truncate the potentially corrupt portion.
    // If the source was client and the last node in the pipeline was lost,
    // any corrupt data written after the acked length can go unnoticed.
    if (numBytes > bytesAcked) {
      final File replicafile = rbw.getBlockFile();
      truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
      rbw.setNumBytes(bytesAcked);
      rbw.setLastChecksumAndDataLen(bytesAcked, null);
    }

    // bump the replica's generation stamp to newGS
    bumpReplicaGS(rbw, newGS);
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }
  return new ReplicaHandler(rbw, ref);
}
项目:hadoop    文件:FsDatasetSpi.java   
/**
 * Creates a temporary replica and returns the meta information of the replica
 * 
 * @param b block
 * @return the meta info of the replica which is being written to
 * @throws IOException if an error occurs
 */
public ReplicaHandler createTemporary(StorageType storageType,
    ExtendedBlock b) throws IOException;
项目:hadoop    文件:FsDatasetSpi.java   
/**
 * Creates a RBW replica and returns the meta info of the replica
 * 
 * @param b block
 * @return the meta info of the replica which is being written to
 * @throws IOException if an error occurs
 */
public ReplicaHandler createRbw(StorageType storageType,
    ExtendedBlock b, boolean allowLazyPersist) throws IOException;
项目:hadoop    文件:FsDatasetSpi.java   
/**
 * Recovers a RBW replica and returns the meta info of the replica
 * 
 * @param b block
 * @param newGS the new generation stamp for the replica
 * @param minBytesRcvd the minimum number of bytes that the replica could have
 * @param maxBytesRcvd the maximum number of bytes that the replica could have
 * @return the meta info of the replica which is being written to
 * @throws IOException if an error occurs
 */
public ReplicaHandler recoverRbw(ExtendedBlock b,
    long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException;
项目:hadoop    文件:FsDatasetSpi.java   
/**
 * Append to a finalized replica and returns the meta info of the replica
 * 
 * @param b block
 * @param newGS the new generation stamp for the replica
 * @param expectedBlockLen the number of bytes the replica is expected to have
 * @return the meata info of the replica which is being written to
 * @throws IOException
 */
public ReplicaHandler append(ExtendedBlock b, long newGS,
    long expectedBlockLen) throws IOException;
项目:hadoop    文件:FsDatasetSpi.java   
/**
 * Recover a failed append to a finalized replica
 * and returns the meta info of the replica
 * 
 * @param b block
 * @param newGS the new generation stamp for the replica
 * @param expectedBlockLen the number of bytes the replica is expected to have
 * @return the meta info of the replica which is being written to
 * @throws IOException
 */
public ReplicaHandler recoverAppend(
    ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException;
项目:aliyun-oss-hadoop-fs    文件:FsDatasetSpi.java   
/**
 * Creates a temporary replica and returns the meta information of the replica
 * .
 * 
 * @param b block
 * @return the meta info of the replica which is being written to
 * @throws IOException if an error occurs
 */
ReplicaHandler createTemporary(StorageType storageType,
    ExtendedBlock b) throws IOException;
项目:aliyun-oss-hadoop-fs    文件:FsDatasetSpi.java   
/**
 * Creates a RBW replica and returns the meta info of the replica
 * 
 * @param b block
 * @return the meta info of the replica which is being written to
 * @throws IOException if an error occurs
 */
ReplicaHandler createRbw(StorageType storageType,
    ExtendedBlock b, boolean allowLazyPersist) throws IOException;
项目:aliyun-oss-hadoop-fs    文件:FsDatasetSpi.java   
/**
 * Recovers a RBW replica and returns the meta info of the replica.
 * 
 * @param b block
 * @param newGS the new generation stamp for the replica
 * @param minBytesRcvd the minimum number of bytes that the replica could have
 * @param maxBytesRcvd the maximum number of bytes that the replica could have
 * @return the meta info of the replica which is being written to
 * @throws IOException if an error occurs
 */
ReplicaHandler recoverRbw(ExtendedBlock b,
    long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException;
项目:aliyun-oss-hadoop-fs    文件:FsDatasetSpi.java   
/**
 * Append to a finalized replica and returns the meta info of the replica.
 * 
 * @param b block
 * @param newGS the new generation stamp for the replica
 * @param expectedBlockLen the number of bytes the replica is expected to have
 * @return the meata info of the replica which is being written to
 * @throws IOException
 */
ReplicaHandler append(ExtendedBlock b, long newGS,
    long expectedBlockLen) throws IOException;
项目:aliyun-oss-hadoop-fs    文件:FsDatasetSpi.java   
/**
 * Recover a failed append to a finalized replica and returns the meta
 * info of the replica.
 * 
 * @param b block
 * @param newGS the new generation stamp for the replica
 * @param expectedBlockLen the number of bytes the replica is expected to have
 * @return the meta info of the replica which is being written to
 * @throws IOException
 */
ReplicaHandler recoverAppend(
    ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException;
项目:big-c    文件:FsDatasetSpi.java   
/**
 * Creates a temporary replica and returns the meta information of the replica
 * 
 * @param b block
 * @return the meta info of the replica which is being written to
 * @throws IOException if an error occurs
 */
public ReplicaHandler createTemporary(StorageType storageType,
    ExtendedBlock b) throws IOException;
项目:big-c    文件:FsDatasetSpi.java   
/**
 * Creates a RBW replica and returns the meta info of the replica
 * 
 * @param b block
 * @return the meta info of the replica which is being written to
 * @throws IOException if an error occurs
 */
public ReplicaHandler createRbw(StorageType storageType,
    ExtendedBlock b, boolean allowLazyPersist) throws IOException;
项目:big-c    文件:FsDatasetSpi.java   
/**
 * Recovers a RBW replica and returns the meta info of the replica
 * 
 * @param b block
 * @param newGS the new generation stamp for the replica
 * @param minBytesRcvd the minimum number of bytes that the replica could have
 * @param maxBytesRcvd the maximum number of bytes that the replica could have
 * @return the meta info of the replica which is being written to
 * @throws IOException if an error occurs
 */
public ReplicaHandler recoverRbw(ExtendedBlock b,
    long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException;
项目:big-c    文件:FsDatasetSpi.java   
/**
 * Append to a finalized replica and returns the meta info of the replica
 * 
 * @param b block
 * @param newGS the new generation stamp for the replica
 * @param expectedBlockLen the number of bytes the replica is expected to have
 * @return the meata info of the replica which is being written to
 * @throws IOException
 */
public ReplicaHandler append(ExtendedBlock b, long newGS,
    long expectedBlockLen) throws IOException;
项目:big-c    文件:FsDatasetSpi.java   
/**
 * Recover a failed append to a finalized replica
 * and returns the meta info of the replica
 * 
 * @param b block
 * @param newGS the new generation stamp for the replica
 * @param expectedBlockLen the number of bytes the replica is expected to have
 * @return the meta info of the replica which is being written to
 * @throws IOException
 */
public ReplicaHandler recoverAppend(
    ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException;
项目:hadoop-2.6.0-cdh5.4.3    文件:FsDatasetSpi.java   
/**
 * Creates a temporary replica and returns the meta information of the replica
 * 
 * @param b block
 * @return the meta info of the replica which is being written to
 * @throws IOException if an error occurs
 */
public ReplicaHandler createTemporary(StorageType storageType,
    ExtendedBlock b) throws IOException;
项目:hadoop-2.6.0-cdh5.4.3    文件:FsDatasetSpi.java   
/**
 * Creates a RBW replica and returns the meta info of the replica
 * 
 * @param b block
 * @return the meta info of the replica which is being written to
 * @throws IOException if an error occurs
 */
public ReplicaHandler createRbw(StorageType storageType,
    ExtendedBlock b, boolean allowLazyPersist) throws IOException;
项目:hadoop-2.6.0-cdh5.4.3    文件:FsDatasetSpi.java   
/**
 * Recovers a RBW replica and returns the meta info of the replica
 * 
 * @param b block
 * @param newGS the new generation stamp for the replica
 * @param minBytesRcvd the minimum number of bytes that the replica could have
 * @param maxBytesRcvd the maximum number of bytes that the replica could have
 * @return the meta info of the replica which is being written to
 * @throws IOException if an error occurs
 */
public ReplicaHandler recoverRbw(ExtendedBlock b,
    long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException;
项目:hadoop-2.6.0-cdh5.4.3    文件:FsDatasetSpi.java   
/**
 * Append to a finalized replica and returns the meta info of the replica
 * 
 * @param b block
 * @param newGS the new generation stamp for the replica
 * @param expectedBlockLen the number of bytes the replica is expected to have
 * @return the meata info of the replica which is being written to
 * @throws IOException
 */
public ReplicaHandler append(ExtendedBlock b, long newGS,
    long expectedBlockLen) throws IOException;
项目:hadoop-2.6.0-cdh5.4.3    文件:FsDatasetSpi.java   
/**
 * Recover a failed append to a finalized replica
 * and returns the meta info of the replica
 * 
 * @param b block
 * @param newGS the new generation stamp for the replica
 * @param expectedBlockLen the number of bytes the replica is expected to have
 * @return the meta info of the replica which is being written to
 * @throws IOException
 */
public ReplicaHandler recoverAppend(
    ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException;