private boolean replaceBlock( Block block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination, int namespaceId) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr( destination.getName()), HdfsConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK); out.writeInt(namespaceId); out.writeLong(block.getBlockId()); out.writeLong(block.getGenerationStamp()); Text.writeString(out, source.getStorageID()); sourceProxy.write(out); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); short status = reply.readShort(); if(status == DataTransferProtocol.OP_STATUS_SUCCESS) { return true; } return false; }
/** * Close the given BlockReader and cache its socket. */ private void closeBlockReader(BlockReader reader, boolean reuseConnection) throws IOException { if (reader.hasSentStatusCode()) { Socket oldSock = reader.takeSocket(); if (dfsClient.getDataTransferProtocolVersion() < DataTransferProtocol.READ_REUSE_CONNECTION_VERSION || !reuseConnection) { // close the sock for old datanode. if (oldSock != null) { IOUtils.closeSocket(oldSock); } } else { socketCache.put(oldSock); } } reader.close(); }
/** * Read the block length information from data stream * * @throws IOException */ private synchronized void readBlockSizeInfo() throws IOException { if (!transferBlockSize) { return; } blkLenInfoUpdated = true; isBlockFinalized = in.readBoolean(); updatedBlockLength = in.readLong(); if (dataTransferVersion >= DataTransferProtocol.READ_PROFILING_VERSION) { readDataNodeProfilingData(); } if (LOG.isDebugEnabled()) { LOG.debug("ifBlockComplete? " + isBlockFinalized + " block size: " + updatedBlockLength); } }
LocatedBlocks createLocatedBlocks(List<LocatedBlock> blocks, BlockMetaInfoType type,int namespaceid, int methodsFingerprint) { switch (type) { case VERSION_AND_NAMESPACEID: return new LocatedBlocksWithMetaInfo( computeContentSummary().getLength(), blocks, isUnderConstruction(), DataTransferProtocol.DATA_TRANSFER_VERSION, namespaceid, methodsFingerprint); case VERSION: return new VersionedLocatedBlocks(computeContentSummary().getLength(), blocks, isUnderConstruction(), DataTransferProtocol.DATA_TRANSFER_VERSION); default: return new LocatedBlocks(computeContentSummary().getLength(), blocks, isUnderConstruction()); } }
private boolean replaceBlock( Block block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr( destination.getName()), HdfsConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK); out.writeLong(block.getBlockId()); out.writeLong(block.getGenerationStamp()); Text.writeString(out, source.getStorageID()); sourceProxy.write(out); BlockTokenSecretManager.DUMMY_TOKEN.write(out); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); short status = reply.readShort(); if(status == DataTransferProtocol.OP_STATUS_SUCCESS) { return true; } return false; }
private void testWrite(Block block, BlockConstructionStage stage, long newGS, String description, Boolean eofExcepted) throws IOException { sendBuf.reset(); recvBuf.reset(); DataTransferProtocol.Sender.opWriteBlock(sendOut, block, 0, stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null, new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN); if (eofExcepted) { ERROR.write(recvOut); sendRecvData(description, true); } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { //ok finally write a block with 0 len SUCCESS.write(recvOut); Text.writeString(recvOut, ""); // first bad node sendRecvData(description, false); } else { writeZeroLengthPacket(block, description); } }
private boolean replaceBlock( Block block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr( destination.getName()), HdfsConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); REPLACE_BLOCK.write(out); out.writeLong(block.getBlockId()); out.writeLong(block.getGenerationStamp()); Text.writeString(out, source.getStorageID()); sourceProxy.write(out); BlockTokenSecretManager.DUMMY_TOKEN.write(out); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); return DataTransferProtocol.Status.read(reply) == SUCCESS; }
private boolean replaceBlock( Block block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { Socket sock = new Socket(); sock.connect(NetUtils.createSocketAddr( destination.getName()), HdfsConstants.READ_TIMEOUT); sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK); out.writeLong(block.getBlockId()); out.writeLong(block.getGenerationStamp()); Text.writeString(out, source.getStorageID()); sourceProxy.write(out); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); short status = reply.readShort(); if(status == DataTransferProtocol.OP_STATUS_SUCCESS) { return true; } return false; }
void register() throws IOException { // get versions from the namenode nsInfo = nameNode.versionRequest(); dnRegistration.setStorageInfo(new DataStorage(nsInfo, "", null), ""); String storageId = DataNode.createNewStorageId(dnRegistration.getPort()); dnRegistration.setStorageID(storageId); // register datanode dnRegistration = nameNode.register(dnRegistration, DataTransferProtocol.DATA_TRANSFER_VERSION); }
/** * Send a block replace request to the output stream */ private void sendRequest(DataOutputStream out) throws IOException { ReplaceBlockHeader header = new ReplaceBlockHeader(new VersionAndOpcode( dataTransferProtocolVersion, DataTransferProtocol.OP_REPLACE_BLOCK)); header.set(namespaceId, block.getBlock().getBlockId(), block.getBlock() .getGenerationStamp(), source.getStorageID(), proxySource); header.writeVersionAndOpCode(out); header.write(out); out.flush(); }
/** * Receive a block copy response from the input stream */ private void receiveResponse(DataInputStream in) throws IOException { short status = in.readShort(); if (status != DataTransferProtocol.OP_STATUS_SUCCESS) { throw new IOException("block move is failed"); } }
/** * Register standby with this primary */ @Override public int register() throws IOException { enforceActive("Standby can only register with active namenode"); verifyCheckpointerAddress(); return DataTransferProtocol.DATA_TRANSFER_VERSION; }
void updateDataTransferProtocolVersionIfNeeded(int remoteDataTransferVersion) { int newDataTransferVersion = 0; if (remoteDataTransferVersion < DataTransferProtocol.DATA_TRANSFER_VERSION) { // client is newer than server newDataTransferVersion = remoteDataTransferVersion; } else { // client is older or the same as server newDataTransferVersion = DataTransferProtocol.DATA_TRANSFER_VERSION; } synchronized (dataTransferVersion) { if (dataTransferVersion != newDataTransferVersion) { dataTransferVersion = newDataTransferVersion; } } }
int getOutPacketVersion() throws IOException { if (ifPacketIncludeVersion()) { return this.preferredPacketVersion; } else { // If the server side runs on an older version that doesn't support // packet version, the older format that checksum is in the first // is used. // return DataTransferProtocol.PACKET_VERSION_CHECKSUM_FIRST; } }
static DFSOutputStreamPacket getHeartbeatPacket( DFSOutputStream dfsOutputStream, boolean includePktVersion, int packetVersion) throws IOException { if (packetVersion == DataTransferProtocol.PACKET_VERSION_CHECKSUM_FIRST) { return new DFSOutputStreamPacketNonInlineChecksum(dfsOutputStream); } else if (!includePktVersion) { throw new IOException( "Older version doesn't support inline checksum packet format."); } else { return new DFSOutputStreamPacketInlineChecksum(dfsOutputStream); } }
static DFSOutputStreamPacket getPacket(DFSOutputStream dfsOutputStream, boolean includePktVersion, int packetVersion, int pktSize, int chunksPerPkt, long offsetInBlock, WritePacketClientProfile profile) throws IOException { if (packetVersion == DataTransferProtocol.PACKET_VERSION_CHECKSUM_FIRST) { return new DFSOutputStreamPacketNonInlineChecksum(dfsOutputStream, pktSize, chunksPerPkt, offsetInBlock, profile); } else if (!includePktVersion) { throw new IOException( "Older version doesn't support inline checksum packet format."); } else { return new DFSOutputStreamPacketInlineChecksum(dfsOutputStream, pktSize, chunksPerPkt, offsetInBlock, profile); } }
@Override public synchronized int read(byte[] buf, int off, int len) throws IOException { //for the first read, skip the extra bytes at the front. if (lastChunkLen < 0 && startOffset > firstChunkOffset) { // Skip these bytes. But don't call this.skip()! int toSkip = (int)(startOffset - firstChunkOffset); if ( skipBuf == null ) { skipBuf = new byte[bytesPerChecksum]; } if ( super.read(skipBuf, 0, toSkip) != toSkip ) { // should never happen throw new IOException("Could not skip required number of bytes"); } updateStatsAfterRead(toSkip); } boolean eosBefore = eos; int nRead = super.read(buf, off, len); // if gotEOS was set in the previous read, send a status code to the DN: if (dnSock != null && eos && !eosBefore && nRead >= 0) { if (needChecksum()) { sendReadResult(dnSock, DataTransferProtocol.OP_STATUS_CHECKSUM_OK); } else { sendReadResult(dnSock, DataTransferProtocol.OP_STATUS_SUCCESS); } } updateStatsAfterRead(nRead); return nRead; }
private BlockReader( String file, long blockId, DataInputStream in, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, Socket dnSock, long bytesToCheckReadSpeed, long minSpeedBps, long dataTransferVersion, FSClientReadProfilingData cliData) { super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/, 1, verifyChecksum, checksum.getChecksumSize() > 0? checksum : null, checksum.getBytesPerChecksum(), checksum.getChecksumSize()); this.dnSock = dnSock; this.in = in; this.checksum = checksum; this.startOffset = Math.max( startOffset, 0 ); this.dataTransferVersion = dataTransferVersion; this.transferBlockSize = (dataTransferVersion >= DataTransferProtocol.SEND_DATA_LEN_VERSION); this.firstChunkOffset = firstChunkOffset; this.pktIncludeVersion = (dataTransferVersion >= DataTransferProtocol.PACKET_INCLUDE_VERSION_VERSION); lastChunkOffset = firstChunkOffset; lastChunkLen = -1; bytesPerChecksum = this.checksum.getBytesPerChecksum(); checksumSize = this.checksum.getChecksumSize(); this.bytesRead = 0; this.timeRead = 0; this.minSpeedBps = minSpeedBps; this.bytesToCheckReadSpeed = bytesToCheckReadSpeed; this.slownessLoged = false; this.cliData = cliData; }
/** {@inheritDoc} */ public void doGet(HttpServletRequest request, HttpServletResponse response ) throws ServletException, IOException { final UnixUserGroupInformation ugi = getUGI(request); final PrintWriter out = response.getWriter(); final String filename = getFilename(request, response); final XMLOutputter xml = new XMLOutputter(out, "UTF-8"); xml.declaration(); Configuration daemonConf = (Configuration) getServletContext() .getAttribute(HttpServer.CONF_CONTEXT_ATTRIBUTE); final Configuration conf = (daemonConf == null) ? new Configuration() : new Configuration(daemonConf); final int socketTimeout = conf.getInt("dfs.socket.timeout", HdfsConstants.READ_TIMEOUT); final SocketFactory socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); UnixUserGroupInformation.saveToConf(conf, UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi); final ProtocolProxy<ClientProtocol> nnproxy = DFSClient.createRPCNamenode(conf); try { final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum( DataTransferProtocol.DATA_TRANSFER_VERSION, filename, nnproxy.getProxy(), nnproxy, socketFactory, socketTimeout); MD5MD5CRC32FileChecksum.write(xml, checksum); } catch(IOException ioe) { new RemoteException(ioe.getClass().getName(), ioe.getMessage() ).writeXml(filename, xml); } xml.endDocument(); }
private void sendRequest(DataOutputStream out) throws IOException { /* Write the header */ ReplaceBlockHeader replaceBlockHeader = new ReplaceBlockHeader( DataTransferProtocol.DATA_TRANSFER_VERSION, namespaceId, block.getBlock().getBlockId(), block.getBlock().getGenerationStamp(), source.getStorageID(), proxySource.getDatanode()); replaceBlockHeader.writeVersionAndOpCode(out); replaceBlockHeader.write(out); out.flush(); }
/** * Reads the metadata and sends the data in one 'DATA_CHUNK'. * @param in */ void readMetadata(DataInputStream in, VersionAndOpcode versionAndOpcode) throws IOException { ReadMetadataHeader readMetadataHeader = new ReadMetadataHeader(versionAndOpcode); readMetadataHeader.readFields(in); final int namespaceId = readMetadataHeader.getNamespaceId(); Block block = new Block(readMetadataHeader.getBlockId(), 0, readMetadataHeader.getGenStamp()); ReplicaToRead rtr; if ((rtr = datanode.data.getReplicaToRead(namespaceId, block)) == null || rtr.isInlineChecksum()) { throw new IOException( "Read metadata from inline checksum file is not supported"); } DataOutputStream out = null; try { updateCurrentThreadName("reading metadata for block " + block); out = new DataOutputStream( NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); byte[] buf = BlockWithChecksumFileReader.getMetaData(datanode.data, namespaceId, block); out.writeByte(DataTransferProtocol.OP_STATUS_SUCCESS); out.writeInt(buf.length); out.write(buf); //last DATA_CHUNK out.writeInt(0); } finally { IOUtils.closeStream(out); } }
/** * When the reader reaches end of the read, it sends a status response * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN * closing our connection (which we will re-open), but won't affect * data correctness. */ void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) { assert !sentStatusCode : "already sent status code to " + sock; try { OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT); statusCode.writeOutputStream(out); out.flush(); sentStatusCode = true; } catch (IOException e) { // It's ok not to be able to send this. But something is probably wrong. LOG.info("Could not send read status (" + statusCode + ") to datanode " + sock.getInetAddress() + ": " + e.getMessage()); } }
private void sendRequest(DataOutputStream out) throws IOException { Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN; if (isBlockTokenEnabled) { accessToken = blockTokenSecretManager.generateToken(null, block .getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE, BlockTokenSecretManager.AccessMode.COPY)); } DataTransferProtocol.Sender.opReplaceBlock(out, block.getBlock(), source.getStorageID(), proxySource.getDatanode(), accessToken); }
private void receiveResponse(DataInputStream in) throws IOException { DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in); if (status != DataTransferProtocol.Status.SUCCESS) { if (status == ERROR_ACCESS_TOKEN) throw new IOException("block move failed due to access token error"); throw new IOException("block move is failed"); } }
/** * Utility function for sending a response. * @param s socket to write to * @param opStatus status message to write * @param timeout send timeout **/ private void sendResponse(Socket s, DataTransferProtocol.Status opStatus, long timeout) throws IOException { DataOutputStream reply = new DataOutputStream(NetUtils.getOutputStream(s, timeout)); opStatus.write(reply); reply.flush(); }
/** * Send a block replace request to the output stream */ private void sendRequest(DataOutputStream out) throws IOException { out.writeShort(dataTransferProtocolVersion); out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK); if (dataTransferProtocolVersion >= DataTransferProtocol.FEDERATION_VERSION) { out.writeInt(namespaceId); } out.writeLong(block.getBlock().getBlockId()); out.writeLong(block.getBlock().getGenerationStamp()); Text.writeString(out, source.getStorageID()); proxySource.write(out); out.flush(); }
/** * Test to run a filter */ public void testPathFilter() throws Exception { LOG.info("Test testPathFilter started."); long blockSizes [] = {1024L}; int stripeLengths [] = {5, 6, 10, 11, 12}; int targetReplication = 1; int metaReplication = 1; int numBlock = 11; int iter = 0; createClusters(true, false); try { assertEquals(DataTransferProtocol.DATA_TRANSFER_VERSION, RaidUtils.getDataTransferProtocolVersion(conf)); for (long blockSize : blockSizes) { for (int stripeLength : stripeLengths) { this.loadTestCodecs(stripeLength, stripeLength, 1, 3); doTestPathFilter(iter, targetReplication, metaReplication, stripeLength, blockSize, numBlock); iter++; } } doCheckPolicy(); } finally { stopClusters(); } LOG.info("Test testPathFilter completed."); }
private BlockReader( String file, long blockId, DataInputStream in, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, Socket dnSock, long minSpeedBps, long dataTransferVersion ) { super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/, 1, verifyChecksum, checksum.getChecksumSize() > 0? checksum : null, checksum.getBytesPerChecksum(), checksum.getChecksumSize()); this.dnSock = dnSock; this.in = in; this.checksum = checksum; this.startOffset = Math.max( startOffset, 0 ); this.transferBlockSize = (dataTransferVersion >= DataTransferProtocol.SEND_DATA_LEN_VERSION); this.firstChunkOffset = firstChunkOffset; lastChunkOffset = firstChunkOffset; lastChunkLen = -1; bytesPerChecksum = this.checksum.getBytesPerChecksum(); checksumSize = this.checksum.getChecksumSize(); this.bytesRead = 0; this.timeRead = 0; this.minSpeedBps = minSpeedBps; this.slownessLoged = false; }
private void checksumOk(Socket sock) { try { OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT); byte buf[] = { (DataTransferProtocol.OP_STATUS_CHECKSUM_OK >>> 8) & 0xff, (DataTransferProtocol.OP_STATUS_CHECKSUM_OK) & 0xff }; out.write(buf); out.flush(); } catch (IOException e) { // its ok not to be able to send this. LOG.debug("Could not write to datanode " + sock.getInetAddress() + ": " + e.getMessage()); } }
/** * Reads the metadata and sends the data in one 'DATA_CHUNK'. * * @param in */ void readMetadata(DataInputStream in, VersionAndOpcode versionAndOpcode) throws IOException { ReadMetadataHeader readMetadataHeader = new ReadMetadataHeader( versionAndOpcode); readMetadataHeader.readFields(in); final int namespaceId = readMetadataHeader.getNamespaceId(); Block block = new Block(readMetadataHeader.getBlockId(), 0, readMetadataHeader.getGenStamp()); MetaDataInputStream checksumIn = null; DataOutputStream out = null; updateCurrentThreadName("reading metadata for block " + block); try { checksumIn = datanode.data.getMetaDataInputStream(namespaceId, block); long fileSize = checksumIn.getLength(); if (fileSize >= 1L << 31 || fileSize <= 0) { throw new IOException( "Unexpected size for checksumFile of block" + block); } byte[] buf = new byte[(int) fileSize]; IOUtils.readFully(checksumIn, buf, 0, buf.length); out = new DataOutputStream(NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); out.writeByte(DataTransferProtocol.OP_STATUS_SUCCESS); out.writeInt(buf.length); out.write(buf); // last DATA_CHUNK out.writeInt(0); } finally { IOUtils.closeStream(out); IOUtils.closeStream(checksumIn); } }