/** * Connect to the first item in the target list. Pass along the * entire target list, the block, and the data. */ DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes, ExtendedBlock b, BlockConstructionStage stage, final String clientname) { if (DataTransferProtocol.LOG.isDebugEnabled()) { DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": " + b + " (numBytes=" + b.getNumBytes() + ")" + ", stage=" + stage + ", clientname=" + clientname + ", targets=" + Arrays.asList(targets) + ", target storage types=" + (targetStorageTypes == null ? "[]" : Arrays.asList(targetStorageTypes))); } this.targets = targets; this.targetStorageTypes = targetStorageTypes; this.b = b; this.stage = stage; BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId()); bpReg = bpos.bpRegistration; this.clientname = clientname; this.cachingStrategy = new CachingStrategy(true, getDnConf().readaheadLength); }
private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long newGS, String description, Boolean eofExcepted) throws IOException { sendBuf.reset(); recvBuf.reset(); writeBlock(block, stage, newGS, DEFAULT_CHECKSUM); if (eofExcepted) { sendResponse(Status.ERROR, null, null, recvOut); sendRecvData(description, true); } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { //ok finally write a block with 0 len sendResponse(Status.SUCCESS, "", null, recvOut); sendRecvData(description, false); } else { writeZeroLengthPacket(block, description); } }
/** * Separated for testing. */ @VisibleForTesting BlockReceiver getBlockReceiver( final ExtendedBlock block, final StorageType storageType, final DataInputStream in, final String inAddr, final String myAddr, final BlockConstructionStage stage, final long newGs, final long minBytesRcvd, final long maxBytesRcvd, final String clientname, final DatanodeInfo srcDataNode, final DataNode dn, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, final boolean allowLazyPersist, final boolean pinning) throws IOException { return new BlockReceiver(block, storageType, in, inAddr, myAddr, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, dn, requestedChecksum, cachingStrategy, allowLazyPersist, pinning); }
/** * Connect to the first item in the target list. Pass along the * entire target list, the block, and the data. */ DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage, final String clientname) { if (DataTransferProtocol.LOG.isDebugEnabled()) { DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": " + b + " (numBytes=" + b.getNumBytes() + ")" + ", stage=" + stage + ", clientname=" + clientname + ", targests=" + Arrays.asList(targets)); } this.targets = targets; this.b = b; this.stage = stage; BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId()); bpReg = bpos.bpRegistration; this.clientname = clientname; }
private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long newGS, String description, Boolean eofExcepted) throws IOException { sendBuf.reset(); recvBuf.reset(); sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], null, stage, 0, block.getNumBytes(), block.getNumBytes(), newGS, DEFAULT_CHECKSUM); if (eofExcepted) { sendResponse(Status.ERROR, null, null, recvOut); sendRecvData(description, true); } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { //ok finally write a block with 0 len sendResponse(Status.SUCCESS, "", null, recvOut); sendRecvData(description, false); } else { writeZeroLengthPacket(block, description); } }
/** * Connect to the first item in the target list. Pass along the * entire target list, the block, and the data. */ DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage, final String clientname) { if (DataTransferProtocol.LOG.isDebugEnabled()) { DataTransferProtocol.LOG.debug( getClass().getSimpleName() + ": " + b + " (numBytes=" + b.getNumBytes() + ")" + ", stage=" + stage + ", clientname=" + clientname + ", targests=" + Arrays.asList(targets)); } this.targets = targets; this.b = b; this.stage = stage; BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId()); bpReg = bpos.bpRegistration; this.clientname = clientname; }
private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long newGS, String description, Boolean eofExcepted) throws IOException { sendBuf.reset(); recvBuf.reset(); sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], null, stage, 0, block.getNumBytes(), block.getNumBytes(), newGS, DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy()); if (eofExcepted) { sendResponse(Status.ERROR, null, null, recvOut); sendRecvData(description, true); } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { //ok finally write a block with 0 len sendResponse(Status.SUCCESS, "", null, recvOut); sendRecvData(description, false); } else { writeZeroLengthPacket(block, description); } }
/** * Initialize for data streaming */ private void initDataStreaming() { this.setName("DataStreamer for file " + src + " block " + block); response = new ResponseProcessor(nodes); response.start(); stage = BlockConstructionStage.DATA_STREAMING; }
private void endBlock() { if(DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Closing old block " + block); } this.setName("DataStreamer for file " + src); closeResponder(); closeStream(); setPipeline(null, null, null); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; }
/** * Transfer a replica to the datanode targets. * @param b the block to transfer. * The corresponding replica must be an RBW or a Finalized. * Its GS and numBytes will be set to * the stored GS and the visible length. * @param targets targets to transfer the block to * @param client client name */ void transferReplicaForPipelineRecovery(final ExtendedBlock b, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final String client) throws IOException { final long storedGS; final long visible; final BlockConstructionStage stage; //get replica information synchronized(data) { Block storedBlock = data.getStoredBlock(b.getBlockPoolId(), b.getBlockId()); if (null == storedBlock) { throw new IOException(b + " not found in datanode."); } storedGS = storedBlock.getGenerationStamp(); if (storedGS < b.getGenerationStamp()) { throw new IOException(storedGS + " = storedGS < b.getGenerationStamp(), b=" + b); } // Update the genstamp with storedGS b.setGenerationStamp(storedGS); if (data.isValidRbw(b)) { stage = BlockConstructionStage.TRANSFER_RBW; } else if (data.isValidBlock(b)) { stage = BlockConstructionStage.TRANSFER_FINALIZED; } else { final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId()); throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r); } visible = data.getReplicaVisibleLength(b); } //set visible length b.setNumBytes(visible); if (targets.length > 0) { new DataTransfer(targets, targetStorageTypes, b, stage, client).run(); } }
void writeBlock(ExtendedBlock block, BlockConstructionStage stage, long newGS, DataChecksum checksum) throws IOException { sender.writeBlock(block, StorageType.DEFAULT, BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], new StorageType[1], null, stage, 0, block.getNumBytes(), block.getNumBytes(), newGS, checksum, CachingStrategy.newDefaultStrategy(), false, false, null); }
private Set<StripedDataStreamer> markExternalErrorOnStreamers() { Set<StripedDataStreamer> healthySet = new HashSet<>(); for (int i = 0; i < numAllBlocks; i++) { final StripedDataStreamer streamer = getStripedDataStreamer(i); if (streamer.isHealthy() && isStreamerWriting(i)) { Preconditions.checkState( streamer.getStage() == BlockConstructionStage.DATA_STREAMING, "streamer: " + streamer); streamer.setExternalError(); healthySet.add(streamer); } } return healthySet; }
/** * construction with tracing info */ DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage, String[] favoredNodes) { this(stat, block, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManage, false, favoredNodes); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; }
/** * Construct a data streamer for appending to the last partial block * @param lastBlock last block of the file to be appended * @param stat status of the file to be appended */ DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage) { this(stat, lastBlock.getBlock(), dfsClient, src, progress, checksum, cachingStrategy, byteArrayManage, true, null); stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; bytesSent = block.getNumBytes(); accessToken = lastBlock.getBlockToken(); }
protected void endBlock() { LOG.debug("Closing old block " + block); this.setName("DataStreamer for file " + src); closeResponder(); closeStream(); setPipeline(null, null, null); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; }
@Test public void testCongestionBackoff() throws IOException { DfsClientConf dfsClientConf = mock(DfsClientConf.class); DFSClient client = mock(DFSClient.class); when(client.getConf()).thenReturn(dfsClientConf); when(client.getTracer()).thenReturn(FsTracer.get(new Configuration())); client.clientRunning = true; DataStreamer stream = new DataStreamer( mock(HdfsFileStatus.class), mock(ExtendedBlock.class), client, "foo", null, null, null, null, null); DataOutputStream blockStream = mock(DataOutputStream.class); doThrow(new IOException()).when(blockStream).flush(); Whitebox.setInternalState(stream, "blockStream", blockStream); Whitebox.setInternalState(stream, "stage", BlockConstructionStage.PIPELINE_CLOSE); @SuppressWarnings("unchecked") LinkedList<DFSPacket> dataQueue = (LinkedList<DFSPacket>) Whitebox.getInternalState(stream, "dataQueue"); @SuppressWarnings("unchecked") ArrayList<DatanodeInfo> congestedNodes = (ArrayList<DatanodeInfo>) Whitebox.getInternalState(stream, "congestedNodes"); congestedNodes.add(mock(DatanodeInfo.class)); DFSPacket packet = mock(DFSPacket.class); when(packet.getTraceParents()).thenReturn(new SpanId[] {}); dataQueue.add(packet); stream.run(); Assert.assertTrue(congestedNodes.isEmpty()); }
/** * construction with tracing info */ private DataStreamer(HdfsFileStatus stat, Span span) { isAppend = false; isLazyPersistFile = isLazyPersist(stat); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; traceSpan = span; }
void writeBlock(ExtendedBlock block, BlockConstructionStage stage, long newGS, DataChecksum checksum) throws IOException { sender.writeBlock(block, StorageType.DEFAULT, BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], new StorageType[1], null, stage, 0, block.getNumBytes(), block.getNumBytes(), newGS, checksum, CachingStrategy.newDefaultStrategy(), false); }