/** Try openning a file for append. */ private static FSDataOutputStream append(FileSystem fs, Path p) throws Exception { for(int i = 0; i < 10; i++) { try { return fs.append(p); } catch(RemoteException re) { if (re.getClassName().equals(RecoveryInProgressException.class.getName())) { AppendTestUtil.LOG.info("Will sleep and retry, i=" + i +", p="+p, re); Thread.sleep(1000); } else throw re; } } throw new IOException("Cannot append to " + p); }
/** * BlockRecoveryFI_05. One DN throws RecoveryInProgressException. * * @throws IOException * in case of an error */ @Test public void testRecoveryInProgressException() throws IOException, InterruptedException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } doThrow(new RecoveryInProgressException("Replica recovery is in progress")). when(spyDN).initReplicaRecovery(any(RecoveringBlock.class)); for(RecoveringBlock rBlock: initRecoveringBlocks()){ BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = recoveryWorker.new RecoveryTaskContiguous(rBlock); BlockRecoveryWorker.RecoveryTaskContiguous spyTask = spy(RecoveryTaskContiguous); spyTask.recover(); verify(spyTask, never()).syncBlock(anyListOf(BlockRecord.class)); } }
/** * Try openning a file for append. */ private static FSDataOutputStream append(FileSystem fs, Path p) throws Exception { for (int i = 0; i < 10; i++) { try { return fs.append(p); } catch (RemoteException re) { if (re.getClassName().equals( RecoveryInProgressException.NonAbortingRecoveryInProgressException.class .getName())) { AppendTestUtil.LOG .info("Will sleep and retry, i=" + i + ", p=" + p, re); Thread.sleep(1000); } else { throw re; } } } throw new IOException("Cannot append to " + p); }
/** * BlockRecoveryFI_05. One DN throws RecoveryInProgressException. * * @throws IOException * in case of an error */ @Test public void testRecoveryInProgressException() throws IOException, InterruptedException { if (LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } try { DataNode spyDN = spy(dn); doThrow( new RecoveryInProgressException("Replica recovery is in progress")). when(spyDN).initReplicaRecovery(any(RecoveringBlock.class)); Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks()); d.join(); verify(spyDN, never()) .syncBlock(any(RecoveringBlock.class), anyListOf(BlockRecord.class)); } catch (Exception e) { e.printStackTrace(); } }
/** * BlockRecoveryFI_05. One DN throws RecoveryInProgressException. * * @throws IOException * in case of an error */ @Test public void testRecoveryInProgressException() throws IOException, InterruptedException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } DataNode spyDN = spy(dn); doThrow(new RecoveryInProgressException("Replica recovery is in progress")). when(spyDN).initReplicaRecovery(any(RecoveringBlock.class)); Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks()); d.join(); verify(spyDN, never()).syncBlock( any(RecoveringBlock.class), anyListOf(BlockRecord.class)); }
@Override protected final boolean shouldAbort(Exception e) { if (e instanceof RecoveryInProgressException.NonAbortingRecoveryInProgressException) { return false; } return true; }
/** * BlockRecoveryFI_05. One DN throws RecoveryInProgressException. * * @throws IOException * in case of an error */ @Test public void testRecoveryInProgressException() throws IOException, InterruptedException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } DataNode spyDN = spy(dn); doThrow(new RecoveryInProgressException("Replica recovery is in progress")). when(spyDN).initReplicaRecovery(any(RecoveringBlock.class)); Daemon d = spyDN.recoverBlocks(initRecoveringBlocks()); d.join(); verify(spyDN, never()).syncBlock( any(RecoveringBlock.class), anyListOf(BlockRecord.class)); }
/** Recover a block */ private void recoverBlock(RecoveringBlock rBlock) throws IOException { ExtendedBlock block = rBlock.getBlock(); String blookPoolId = block.getBlockPoolId(); DatanodeID[] datanodeids = rBlock.getLocations(); List<BlockRecord> syncList = new ArrayList<BlockRecord>(datanodeids.length); int errorCount = 0; //check generation stamps for(DatanodeID id : datanodeids) { try { BPOfferService bpos = blockPoolManager.get(blookPoolId); DatanodeRegistration bpReg = bpos.bpRegistration; InterDatanodeProtocol datanode = bpReg.equals(id)? this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), dnConf.socketTimeout, dnConf.connectToDnViaHostname); ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock); if (info != null && info.getGenerationStamp() >= block.getGenerationStamp() && info.getNumBytes() > 0) { syncList.add(new BlockRecord(id, datanode, info)); } } catch (RecoveryInProgressException ripE) { InterDatanodeProtocol.LOG.warn( "Recovery for replica " + block + " on data-node " + id + " is already in progress. Recovery id = " + rBlock.getNewGenerationStamp() + " is aborted.", ripE); return; } catch (IOException e) { ++errorCount; InterDatanodeProtocol.LOG.warn( "Failed to obtain replica info for block (=" + block + ") from datanode (=" + id + ")", e); } } if (errorCount == datanodeids.length) { throw new IOException("All datanodes failed: block=" + block + ", datanodeids=" + Arrays.asList(datanodeids)); } syncBlock(rBlock, syncList); }
protected void recover() throws IOException { List<BlockRecord> syncList = new ArrayList<>(locs.length); int errorCount = 0; int candidateReplicaCnt = 0; // Check generation stamps, replica size and state. Replica must satisfy // the following criteria to be included in syncList for recovery: // - Valid generation stamp // - Non-zero length // - Original state is RWR or better for(DatanodeID id : locs) { try { DatanodeID bpReg = new DatanodeID( datanode.getBPOfferService(bpid).bpRegistration); InterDatanodeProtocol proxyDN = bpReg.equals(id)? datanode: DataNode.createInterDataNodeProtocolProxy(id, conf, dnConf.socketTimeout, dnConf.connectToDnViaHostname); ReplicaRecoveryInfo info = callInitReplicaRecovery(proxyDN, rBlock); if (info != null && info.getGenerationStamp() >= block.getGenerationStamp() && info.getNumBytes() > 0) { // Count the number of candidate replicas received. ++candidateReplicaCnt; if (info.getOriginalReplicaState().getValue() <= ReplicaState.RWR.getValue()) { syncList.add(new BlockRecord(id, proxyDN, info)); } else { if (LOG.isDebugEnabled()) { LOG.debug("Block recovery: Ignored replica with invalid " + "original state: " + info + " from DataNode: " + id); } } } else { if (LOG.isDebugEnabled()) { if (info == null) { LOG.debug("Block recovery: DataNode: " + id + " does not have " + "replica for block: " + block); } else { LOG.debug("Block recovery: Ignored replica with invalid " + "generation stamp or length: " + info + " from " + "DataNode: " + id); } } } } catch (RecoveryInProgressException ripE) { InterDatanodeProtocol.LOG.warn( "Recovery for replica " + block + " on data-node " + id + " is already in progress. Recovery id = " + rBlock.getNewGenerationStamp() + " is aborted.", ripE); return; } catch (IOException e) { ++errorCount; InterDatanodeProtocol.LOG.warn( "Failed to obtain replica info for block (=" + block + ") from datanode (=" + id + ")", e); } } if (errorCount == locs.length) { throw new IOException("All datanodes failed: block=" + block + ", datanodeids=" + Arrays.asList(locs)); } // None of the replicas reported by DataNodes has the required original // state, report the error. if (candidateReplicaCnt > 0 && syncList.isEmpty()) { throw new IOException("Found " + candidateReplicaCnt + " replica(s) for block " + block + " but none is in " + ReplicaState.RWR.name() + " or better state. datanodeids=" + Arrays.asList(locs)); } syncBlock(syncList); }
/** * Recover a block */ private void recoverBlock(RecoveringBlock rBlock) throws IOException { ExtendedBlock block = rBlock.getBlock(); String blookPoolId = block.getBlockPoolId(); DatanodeID[] datanodeids = rBlock.getLocations(); List<BlockRecord> syncList = new ArrayList<>(datanodeids.length); int errorCount = 0; //check generation stamps for (DatanodeID id : datanodeids) { try { BPOfferService bpos = blockPoolManager.get(blookPoolId); DatanodeRegistration bpReg = bpos.bpRegistration; InterDatanodeProtocol datanode = bpReg.equals(id) ? this : DataNode .createInterDataNodeProtocolProxy(id, getConf(), dnConf.socketTimeout, dnConf.connectToDnViaHostname); ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock); if (info != null && info.getGenerationStamp() >= block.getGenerationStamp() && info.getNumBytes() > 0) { syncList.add(new BlockRecord(id, datanode, info)); } } catch (RecoveryInProgressException ripE) { InterDatanodeProtocol.LOG.warn( "Recovery for replica " + block + " on data-node " + id + " is already in progress. Recovery id = " + rBlock.getNewGenerationStamp() + " is aborted.", ripE); return; } catch (IOException e) { ++errorCount; InterDatanodeProtocol.LOG.warn( "Failed to obtain replica info for block (=" + block + ") from datanode (=" + id + ")", e); } } if (errorCount == datanodeids.length) { throw new IOException( "All datanodes failed: block=" + block + ", datanodeids=" + Arrays.asList(datanodeids)); } syncBlock(rBlock, syncList); }
/** static version of {@link #initReplicaRecovery(Block, long)}. */ static ReplicaRecoveryInfo initReplicaRecovery( ReplicasMap map, Block block, long recoveryId) throws IOException { final ReplicaInfo replica = map.get(block.getBlockId()); DataNode.LOG.info("initReplicaRecovery: block=" + block + ", recoveryId=" + recoveryId + ", replica=" + replica); //check replica if (replica == null) { return null; } //stop writer if there is any if (replica instanceof ReplicaInPipeline) { final ReplicaInPipeline rip = (ReplicaInPipeline)replica; rip.stopWriter(); //check replica bytes on disk. if (rip.getBytesOnDisk() < rip.getVisibleLength()) { throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:" + " getBytesOnDisk() < getVisibleLength(), rip=" + rip); } //check the replica's files checkReplicaFiles(rip); } //check generation stamp if (replica.getGenerationStamp() < block.getGenerationStamp()) { throw new IOException( "replica.getGenerationStamp() < block.getGenerationStamp(), block=" + block + ", replica=" + replica); } //check recovery id if (replica.getGenerationStamp() >= recoveryId) { throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:" + " replica.getGenerationStamp() >= recoveryId = " + recoveryId + ", block=" + block + ", replica=" + replica); } //check RUR final ReplicaUnderRecovery rur; if (replica.getState() == ReplicaState.RUR) { rur = (ReplicaUnderRecovery)replica; if (rur.getRecoveryID() >= recoveryId) { throw new RecoveryInProgressException( "rur.getRecoveryID() >= recoveryId = " + recoveryId + ", block=" + block + ", rur=" + rur); } final long oldRecoveryID = rur.getRecoveryID(); rur.setRecoveryID(recoveryId); DataNode.LOG.info("initReplicaRecovery: update recovery id for " + block + " from " + oldRecoveryID + " to " + recoveryId); } else { rur = new ReplicaUnderRecovery(replica, recoveryId); map.add(rur); DataNode.LOG.info("initReplicaRecovery: changing replica state for " + block + " from " + replica.getState() + " to " + rur.getState()); } return rur.createInfo(); }
/** Recover a block */ private void recoverBlock(RecoveringBlock rBlock) throws IOException { Block block = rBlock.getBlock(); DatanodeInfo[] targets = rBlock.getLocations(); DatanodeID[] datanodeids = (DatanodeID[])targets; List<BlockRecord> syncList = new ArrayList<BlockRecord>(datanodeids.length); int errorCount = 0; //check generation stamps for(DatanodeID id : datanodeids) { try { InterDatanodeProtocol datanode = dnRegistration.equals(id)? this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), socketTimeout); ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock); if (info != null && info.getGenerationStamp() >= block.getGenerationStamp() && info.getNumBytes() > 0) { syncList.add(new BlockRecord(id, datanode, info)); } } catch (RecoveryInProgressException ripE) { InterDatanodeProtocol.LOG.warn( "Recovery for replica " + block + " on data-node " + id + " is already in progress. Recovery id = " + rBlock.getNewGenerationStamp() + " is aborted.", ripE); return; } catch (IOException e) { ++errorCount; InterDatanodeProtocol.LOG.warn( "Failed to obtain replica info for block (=" + block + ") from datanode (=" + id + ")", e); } } if (errorCount == datanodeids.length) { throw new IOException("All datanodes failed: block=" + block + ", datanodeids=" + Arrays.asList(datanodeids)); } syncBlock(rBlock, syncList); }