@Override public BlockRecoveryInfo startBlockRecovery(long blockId) throws IOException { Block stored = getStoredBlock(blockId); if (stored == null) { return null; } // It's important that this loop not be synchronized - otherwise // this will deadlock against the thread it's joining against! while (true) { DataNode.LOG.debug( "Interrupting active writer threads for block " + stored); List<Thread> activeThreads = getActiveThreads(stored); if (activeThreads == null) break; if (interruptAndJoinThreads(activeThreads)) break; } synchronized (this) { ActiveFile activeFile = ongoingCreates.get(stored); boolean isRecovery = (activeFile != null) && activeFile.wasRecoveredOnStartup; BlockRecoveryInfo info = new BlockRecoveryInfo( stored, isRecovery); if (DataNode.LOG.isDebugEnabled()) { DataNode.LOG.debug("getBlockMetaDataInfo successful block=" + stored + " length " + stored.getNumBytes() + " genstamp " + stored.getGenerationStamp()); } // paranoia! verify that the contents of the stored block // matches the block file on disk. validateBlockMetadata(stored); return info; } }
@Override public BlockRecoveryInfo startBlockRecovery(int namespaceId, long blockId) throws IOException { Block stored = getStoredBlock(namespaceId, blockId); return new BlockRecoveryInfo(stored, false); }
@Override public BlockRecoveryInfo startBlockRecovery(int namespaceId, long blockId) throws IOException { Block stored = getStoredBlock(namespaceId, blockId, true); if (stored == null) { return null; } // It's important that this loop not be synchronized - otherwise // this will deadlock against the thread it's joining against! while (true) { DataNode.LOG.debug( "Interrupting active writer threads for block " + stored); List<Thread> activeThreads = getActiveThreads(namespaceId, stored); if (activeThreads == null) break; if (interruptAndJoinThreads(activeThreads)) break; } lock.readLock().lock(); try { // now that writers are stopped, re-fetch the block's meta info stored = getStoredBlock(namespaceId, blockId, true); if (stored == null) { return null; } ActiveFile activeFile = volumeMap.getOngoingCreates(namespaceId, stored); boolean isRecovery = (activeFile != null) && activeFile.wasRecoveredOnStartup; BlockRecoveryInfo info = new BlockRecoveryInfo(stored, isRecovery); if (DataNode.LOG.isDebugEnabled()) { DataNode.LOG.debug("getBlockMetaDataInfo successful block=" + stored + " length " + stored.getNumBytes() + " genstamp " + stored.getGenerationStamp()); } // paranoia! verify that the contents of the stored block // matches the block file on disk. validateBlockMetadata(namespaceId, stored); return info; } finally { lock.readLock().unlock(); } }
@Override public BlockRecoveryInfo startBlockRecovery(int namespaceId, Block block) throws IOException { InjectionHandler.processEvent(InjectionEvent.DATANODE_BEFORE_RECOVERBLOCK, this); return data.startBlockRecovery(namespaceId, block.getBlockId()); }
public BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, BlockRecoveryInfo info) { this.id = id; this.datanode = datanode; this.info = info; }
@Override public BlockRecoveryInfo startBlockRecovery(long blockId) throws IOException { Block stored = getStoredBlock(blockId); return new BlockRecoveryInfo(stored, false); }
@Override public BlockRecoveryInfo startBlockRecovery(Block block) throws IOException { return data.startBlockRecovery(block.getBlockId()); }
BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, BlockRecoveryInfo info) { this.id = id; this.datanode = datanode; this.info = info; }
@Override public BlockRecoveryInfo startBlockRecovery(int namespaceId, Block block) throws IOException { return data.startBlockRecovery(namespaceId, block.getBlockId()); }
public BlockRecoveryInfo startBlockRecovery(int namespaceId, long blockId) throws IOException;
public BlockRecoveryInfo startBlockRecovery(long blockId) throws IOException;