@Override public void onFailLazyPersist(String bpId, long blockId) { RamDiskReplica block = null; block = ramDiskReplicaTracker.getReplica(bpId, blockId); if (block != null) { LOG.warn("Failed to save replica " + block + ". re-enqueueing it."); ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block); } }
/** * Checkpoint a pending replica to persistent storage now. * If we fail then move the replica to the end of the queue. * @return true if there is more work to be done, false otherwise. */ private boolean saveNextReplica() { RamDiskReplica block = null; FsVolumeReference targetReference; FsVolumeImpl targetVolume; ReplicaInfo replicaInfo; boolean succeeded = false; try { block = ramDiskReplicaTracker.dequeueNextReplicaToPersist(); if (block != null) { synchronized (FsDatasetImpl.this) { replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); // If replicaInfo is null, the block was either deleted before // it could be checkpointed or it is already on persistent storage. // This can occur if a second replica on persistent storage was found // after the lazy write was scheduled. if (replicaInfo != null && replicaInfo.getVolume().isTransientStorage()) { // Pick a target volume to persist the block. targetReference = volumes.getNextVolume( StorageType.DEFAULT, replicaInfo.getNumBytes()); targetVolume = (FsVolumeImpl) targetReference.getVolume(); ramDiskReplicaTracker.recordStartLazyPersist( block.getBlockPoolId(), block.getBlockId(), targetVolume); if (LOG.isDebugEnabled()) { LOG.debug("LazyWriter: Start persisting RamDisk block:" + " block pool Id: " + block.getBlockPoolId() + " block id: " + block.getBlockId() + " on target volume " + targetVolume); } asyncLazyPersistService.submitLazyPersistTask( block.getBlockPoolId(), block.getBlockId(), replicaInfo.getGenerationStamp(), block.getCreationTime(), replicaInfo.getMetaFile(), replicaInfo.getBlockFile(), targetReference); } } } succeeded = true; } catch(IOException ioe) { LOG.warn("Exception saving replica " + block, ioe); } finally { if (!succeeded && block != null) { LOG.warn("Failed to save replica " + block + ". re-enqueueing it."); onFailLazyPersist(block.getBlockPoolId(), block.getBlockId()); } } return succeeded; }