@Override // FsDatasetSpi public synchronized ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { // If the block was successfully finalized because all packets // were successfully processed at the Datanode but the ack for // some of the packets were not received by the client. The client // re-opens the connection and retries sending those packets. // The other reason is that an "append" is occurring to this block. // check the validity of the parameter if (newGS < b.getGenerationStamp()) { throw new IOException("The new generation stamp " + newGS + " should be greater than the replica " + b + "'s generation stamp"); } ReplicaInfo replicaInfo = getReplicaInfo(b); LOG.info("Appending to " + replicaInfo); if (replicaInfo.getState() != ReplicaState.FINALIZED) { throw new ReplicaNotFoundException( ReplicaNotFoundException.UNFINALIZED_REPLICA + b); } if (replicaInfo.getNumBytes() != expectedBlockLen) { throw new IOException("Corrupted replica " + replicaInfo + " with a length of " + replicaInfo.getNumBytes() + " expected length is " + expectedBlockLen); } FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); ReplicaBeingWritten replica = null; try { replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS, b.getNumBytes()); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } return new ReplicaHandler(replica, ref); }
@Override // FsDatasetSpi public synchronized ReplicaHandler createTemporary( StorageType storageType, ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { if (replicaInfo.getGenerationStamp() < b.getGenerationStamp() && replicaInfo instanceof ReplicaInPipeline) { // Stop the previous writer ((ReplicaInPipeline)replicaInfo) .stopWriter(datanode.getDnConf().getXceiverStopTimeout()); invalidate(b.getBlockPoolId(), new Block[]{replicaInfo}); } else { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + replicaInfo.getState() + " and thus cannot be created."); } } FsVolumeReference ref = volumes.getNextVolume(storageType, b.getNumBytes()); FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); // create a temporary file to hold block in the designated volume File f; try { f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile(), 0); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return new ReplicaHandler(newReplicaInfo, ref); }
@Override // FsDatasetSpi public synchronized ReplicaHandler createRbw( StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + replicaInfo.getState() + " and thus cannot be created."); } // create a new block FsVolumeReference ref; while (true) { try { if (allowLazyPersist) { // First try to place the block on a transient volume. ref = volumes.getNextTransientVolume(b.getNumBytes()); datanode.getMetrics().incrRamDiskBlocksWrite(); } else { ref = volumes.getNextVolume(storageType, b.getNumBytes()); } } catch (DiskOutOfSpaceException de) { if (allowLazyPersist) { datanode.getMetrics().incrRamDiskBlocksWriteFallback(); allowLazyPersist = false; continue; } throw de; } break; } FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); // create an rbw file to hold block in the designated volume File f; try { f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return new ReplicaHandler(newReplicaInfo, ref); }
@Override // FsDatasetSpi public synchronized ReplicaHandler recoverRbw( ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { LOG.info("Recover RBW replica " + b); ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); // check the replica's state if (replicaInfo.getState() != ReplicaState.RBW) { throw new ReplicaNotFoundException( ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo); } ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; LOG.info("Recovering " + rbw); // Stop the previous writer rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout()); rbw.setWriter(Thread.currentThread()); // check generation stamp long replicaGenerationStamp = rbw.getGenerationStamp(); if (replicaGenerationStamp < b.getGenerationStamp() || replicaGenerationStamp > newGS) { throw new ReplicaNotFoundException( ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b + ". Expected GS range is [" + b.getGenerationStamp() + ", " + newGS + "]."); } // check replica length long bytesAcked = rbw.getBytesAcked(); long numBytes = rbw.getNumBytes(); if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){ throw new ReplicaNotFoundException("Unmatched length replica " + replicaInfo + ": BytesAcked = " + bytesAcked + " BytesRcvd = " + numBytes + " are not in the range of [" + minBytesRcvd + ", " + maxBytesRcvd + "]."); } FsVolumeReference ref = rbw.getVolume().obtainReference(); try { // Truncate the potentially corrupt portion. // If the source was client and the last node in the pipeline was lost, // any corrupt data written after the acked length can go unnoticed. if (numBytes > bytesAcked) { final File replicafile = rbw.getBlockFile(); truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked); rbw.setNumBytes(bytesAcked); rbw.setLastChecksumAndDataLen(bytesAcked, null); } // bump the replica's generation stamp to newGS bumpReplicaGS(rbw, newGS); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } return new ReplicaHandler(rbw, ref); }
@Override // FsDatasetSpi public ReplicaHandler createTemporary( StorageType storageType, ExtendedBlock b) throws IOException { long startTimeMs = Time.monotonicNow(); long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout(); ReplicaInfo lastFoundReplicaInfo = null; do { synchronized (this) { ReplicaInfo currentReplicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (currentReplicaInfo == lastFoundReplicaInfo) { if (lastFoundReplicaInfo != null) { invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo }); } FsVolumeReference ref = volumes.getNextVolume(storageType, b.getNumBytes()); FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); // create a temporary file to hold block in the designated volume File f; try { f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile(), 0); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return new ReplicaHandler(newReplicaInfo, ref); } else { if (!(currentReplicaInfo.getGenerationStamp() < b .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + currentReplicaInfo.getState() + " and thus cannot be created."); } lastFoundReplicaInfo = currentReplicaInfo; } } // Hang too long, just bail out. This is not supposed to happen. long writerStopMs = Time.monotonicNow() - startTimeMs; if (writerStopMs > writerStopTimeoutMs) { LOG.warn("Unable to stop existing writer for block " + b + " after " + writerStopMs + " miniseconds."); throw new IOException("Unable to stop existing writer for block " + b + " after " + writerStopMs + " miniseconds."); } // Stop the previous writer ((ReplicaInPipeline) lastFoundReplicaInfo) .stopWriter(writerStopTimeoutMs); } while (true); }
@Override // FsDatasetSpi public synchronized ReplicaHandler createRbw( StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + replicaInfo.getState() + " and thus cannot be created."); } // create a new block FsVolumeReference ref = null; // Use ramdisk only if block size is a multiple of OS page size. // This simplifies reservation for partially used replicas // significantly. if (allowLazyPersist && lazyWriter != null && b.getNumBytes() % cacheManager.getOsPageSize() == 0 && reserveLockedMemory(b.getNumBytes())) { try { // First try to place the block on a transient volume. ref = volumes.getNextTransientVolume(b.getNumBytes()); datanode.getMetrics().incrRamDiskBlocksWrite(); } catch(DiskOutOfSpaceException de) { // Ignore the exception since we just fall back to persistent storage. } finally { if (ref == null) { cacheManager.release(b.getNumBytes()); } } } if (ref == null) { ref = volumes.getNextVolume(storageType, b.getNumBytes()); } FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); // create an rbw file to hold block in the designated volume if (allowLazyPersist && !v.isTransientStorage()) { datanode.getMetrics().incrRamDiskBlocksWriteFallback(); } File f; try { f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return new ReplicaHandler(newReplicaInfo, ref); }
@Override // FsDatasetSpi public ReplicaHandler createTemporary( StorageType storageType, ExtendedBlock b) throws IOException { long startTimeMs = Time.monotonicNow(); long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout(); ReplicaInfo lastFoundReplicaInfo = null; do { synchronized (this) { ReplicaInfo currentReplicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (currentReplicaInfo == lastFoundReplicaInfo) { if (lastFoundReplicaInfo != null) { invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo }); } FsVolumeReference ref = volumes.getNextVolume(storageType, b.getNumBytes()); FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); // create a temporary file to hold block in the designated volume File f; try { f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile(), b.getLocalBlock().getNumBytes()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return new ReplicaHandler(newReplicaInfo, ref); } else { if (!(currentReplicaInfo.getGenerationStamp() < b .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + currentReplicaInfo.getState() + " and thus cannot be created."); } lastFoundReplicaInfo = currentReplicaInfo; } } // Hang too long, just bail out. This is not supposed to happen. long writerStopMs = Time.monotonicNow() - startTimeMs; if (writerStopMs > writerStopTimeoutMs) { LOG.warn("Unable to stop existing writer for block " + b + " after " + writerStopMs + " miniseconds."); throw new IOException("Unable to stop existing writer for block " + b + " after " + writerStopMs + " miniseconds."); } // Stop the previous writer ((ReplicaInPipeline) lastFoundReplicaInfo) .stopWriter(writerStopTimeoutMs); } while (true); }
/** * Creates a temporary replica and returns the meta information of the replica * * @param b block * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ public ReplicaHandler createTemporary(StorageType storageType, ExtendedBlock b) throws IOException;
/** * Creates a RBW replica and returns the meta info of the replica * * @param b block * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ public ReplicaHandler createRbw(StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException;
/** * Recovers a RBW replica and returns the meta info of the replica * * @param b block * @param newGS the new generation stamp for the replica * @param minBytesRcvd the minimum number of bytes that the replica could have * @param maxBytesRcvd the maximum number of bytes that the replica could have * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ public ReplicaHandler recoverRbw(ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException;
/** * Append to a finalized replica and returns the meta info of the replica * * @param b block * @param newGS the new generation stamp for the replica * @param expectedBlockLen the number of bytes the replica is expected to have * @return the meata info of the replica which is being written to * @throws IOException */ public ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException;
/** * Recover a failed append to a finalized replica * and returns the meta info of the replica * * @param b block * @param newGS the new generation stamp for the replica * @param expectedBlockLen the number of bytes the replica is expected to have * @return the meta info of the replica which is being written to * @throws IOException */ public ReplicaHandler recoverAppend( ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException;
/** * Creates a temporary replica and returns the meta information of the replica * . * * @param b block * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ ReplicaHandler createTemporary(StorageType storageType, ExtendedBlock b) throws IOException;
/** * Creates a RBW replica and returns the meta info of the replica * * @param b block * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ ReplicaHandler createRbw(StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException;
/** * Recovers a RBW replica and returns the meta info of the replica. * * @param b block * @param newGS the new generation stamp for the replica * @param minBytesRcvd the minimum number of bytes that the replica could have * @param maxBytesRcvd the maximum number of bytes that the replica could have * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ ReplicaHandler recoverRbw(ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException;
/** * Append to a finalized replica and returns the meta info of the replica. * * @param b block * @param newGS the new generation stamp for the replica * @param expectedBlockLen the number of bytes the replica is expected to have * @return the meata info of the replica which is being written to * @throws IOException */ ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException;
/** * Recover a failed append to a finalized replica and returns the meta * info of the replica. * * @param b block * @param newGS the new generation stamp for the replica * @param expectedBlockLen the number of bytes the replica is expected to have * @return the meta info of the replica which is being written to * @throws IOException */ ReplicaHandler recoverAppend( ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException;