private void writeZeroLengthPacket(ExtendedBlock block, String description) throws IOException { PacketHeader hdr = new PacketHeader( 8, // size of packet block.getNumBytes(), // OffsetInBlock 100, // sequencenumber true, // lastPacketInBlock 0, // chunk length false); // sync block hdr.write(sendOut); sendOut.writeInt(0); // zero checksum //ok finally write a block with 0 len sendResponse(Status.SUCCESS, "", null, recvOut); new PipelineAck(100, new int[] {PipelineAck.combineHeader (PipelineAck.ECN.DISABLED, Status.SUCCESS)}).write (recvOut); sendRecvData(description, false); }
private void writeZeroLengthPacket(ExtendedBlock block, String description) throws IOException { PacketHeader hdr = new PacketHeader( 8, // size of packet block.getNumBytes(), // OffsetInBlock 100, // sequencenumber true, // lastPacketInBlock 0, // chunk length false); // sync block hdr.write(sendOut); sendOut.writeInt(0); // zero checksum //ok finally write a block with 0 len sendResponse(Status.SUCCESS, "", null, recvOut); new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut); sendRecvData(description, false); }
private void writeZeroLengthPacket(ExtendedBlock block, String description) throws IOException { PacketHeader hdr = new PacketHeader(8, // size of packet block.getNumBytes(), // OffsetInBlock 100, // sequencenumber true, // lastPacketInBlock 0, // chunk length false); // sync block hdr.write(sendOut); sendOut.writeInt(0); // zero checksum //ok finally write a block with 0 len sendResponse(Status.SUCCESS, "", null, recvOut); new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut); sendRecvData(description, false); }
@Override protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception { Status reply = getStatus(ack); if (reply != Status.SUCCESS) { failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " + block + " from datanode " + ctx.channel().remoteAddress())); return; } if (PipelineAck.isRestartOOBStatus(reply)) { failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " + block + " from datanode " + ctx.channel().remoteAddress())); return; } if (ack.getSeqno() == HEART_BEAT_SEQNO) { return; } completed(ctx.channel()); }
/** * Send an OOB response. If all acks have been sent already for the block * and the responder is about to close, the delivery is not guaranteed. * This is because the other end can close the connection independently. * An OOB coming from downstream will be automatically relayed upstream * by the responder. This method is used only by originating datanode. * * @param ackStatus the type of ack to be sent */ void sendOOBResponse(final Status ackStatus) throws IOException, InterruptedException { if (!running) { LOG.info("Cannot send OOB response " + ackStatus + ". Responder not running."); return; } synchronized(this) { if (sending) { wait(PipelineAck.getOOBTimeout(ackStatus)); // Didn't get my turn in time. Give up. if (sending) { throw new IOException("Could not send OOB reponse in time: " + ackStatus); } } sending = true; } LOG.info("Sending an out of band ack of type " + ackStatus); try { sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L, PipelineAck.combineHeader(datanode.getECN(), ackStatus)); } finally { // Let others send ack. Unless there are miltiple OOB send // calls, there can be only one waiter, the responder thread. // In any case, only one needs to be notified. synchronized(this) { sending = false; notify(); } } }
/** * The wrapper for the unprotected version. This is only called by * the responder's run() method. * * @param ack Ack received from downstream * @param seqno sequence number of ack to be sent upstream * @param totalAckTimeNanos total ack time including all the downstream * nodes * @param offsetInBlock offset in block for the data in packet * @param myHeader the local ack header */ private void sendAckUpstream(PipelineAck ack, long seqno, long totalAckTimeNanos, long offsetInBlock, int myHeader) throws IOException { try { // Wait for other sender to finish. Unless there is an OOB being sent, // the responder won't have to wait. synchronized(this) { while(sending) { wait(); } sending = true; } try { if (!running) return; sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos, offsetInBlock, myHeader); } finally { synchronized(this) { sending = false; notify(); } } } catch (InterruptedException ie) { // The responder was interrupted. Make it go down without // interrupting the receiver(writer) thread. running = false; } }
@Test public void TestPipeLineAckCompatibility() throws IOException { DataTransferProtos.PipelineAckProto proto = DataTransferProtos .PipelineAckProto.newBuilder() .setSeqno(0) .addReply(Status.CHECKSUM_OK) .build(); DataTransferProtos.PipelineAckProto newProto = DataTransferProtos .PipelineAckProto.newBuilder().mergeFrom(proto) .addFlag(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED, Status.CHECKSUM_OK)) .build(); ByteArrayOutputStream oldAckBytes = new ByteArrayOutputStream(); proto.writeDelimitedTo(oldAckBytes); PipelineAck oldAck = new PipelineAck(); oldAck.readFields(new ByteArrayInputStream(oldAckBytes.toByteArray())); assertEquals( PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, Status.CHECKSUM_OK), oldAck.getHeaderFlag(0)); PipelineAck newAck = new PipelineAck(); ByteArrayOutputStream newAckBytes = new ByteArrayOutputStream(); newProto.writeDelimitedTo(newAckBytes); newAck.readFields(new ByteArrayInputStream(newAckBytes.toByteArray())); assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED, Status.CHECKSUM_OK), newAck.getHeaderFlag(0)); }
/** * Send an OOB response. If all acks have been sent already for the block * and the responder is about to close, the delivery is not guaranteed. * This is because the other end can close the connection independently. * An OOB coming from downstream will be automatically relayed upstream * by the responder. This method is used only by originating datanode. * * @param ackStatus the type of ack to be sent */ void sendOOBResponse(final Status ackStatus) throws IOException, InterruptedException { if (!running) { LOG.info("Cannot send OOB response " + ackStatus + ". Responder not running."); return; } synchronized(this) { if (sending) { wait(datanode.getOOBTimeout(ackStatus)); // Didn't get my turn in time. Give up. if (sending) { throw new IOException("Could not send OOB reponse in time: " + ackStatus); } } sending = true; } LOG.info("Sending an out of band ack of type " + ackStatus); try { sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L, PipelineAck.combineHeader(datanode.getECN(), ackStatus)); } finally { // Let others send ack. Unless there are miltiple OOB send // calls, there can be only one waiter, the responder thread. // In any case, only one needs to be notified. synchronized(this) { sending = false; notify(); } } }
/** * The ECN bit for the DataNode. The DataNode should return: * <ul> * <li>ECN.DISABLED when ECN is disabled.</li> * <li>ECN.SUPPORTED when ECN is enabled but the DN still has capacity.</li> * <li>ECN.CONGESTED when ECN is enabled and the DN is congested.</li> * </ul> */ public PipelineAck.ECN getECN() { if (!pipelineSupportECN) { return PipelineAck.ECN.DISABLED; } double load = ManagementFactory.getOperatingSystemMXBean() .getSystemLoadAverage(); return load > NUM_CORES * CONGESTION_RATIO ? PipelineAck.ECN.CONGESTED : PipelineAck.ECN.SUPPORTED; }
@Test public void TestPipeLineAckCompatibility() throws IOException { DataTransferProtos.PipelineAckProto proto = DataTransferProtos .PipelineAckProto.newBuilder() .setSeqno(0) .addReply(Status.CHECKSUM_OK) .build(); DataTransferProtos.PipelineAckProto newProto = DataTransferProtos .PipelineAckProto.newBuilder().mergeFrom(proto) .addFlag(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED, Status.CHECKSUM_OK)) .build(); ByteArrayOutputStream oldAckBytes = new ByteArrayOutputStream(); proto.writeDelimitedTo(oldAckBytes); PipelineAck oldAck = new PipelineAck(); oldAck.readFields(new ByteArrayInputStream(oldAckBytes.toByteArray())); assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, Status .CHECKSUM_OK), oldAck.getHeaderFlag(0)); PipelineAck newAck = new PipelineAck(); ByteArrayOutputStream newAckBytes = new ByteArrayOutputStream(); newProto.writeDelimitedTo(newAckBytes); newAck.readFields(new ByteArrayInputStream(newAckBytes.toByteArray())); assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED, Status .CHECKSUM_OK), newAck.getHeaderFlag(0)); }
@Test public void testECNFlag() throws IOException { Configuration conf = new Configuration(); conf.setBoolean(DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED, true); MiniDFSCluster cluster = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); PipelineAck.ECN ecn = cluster.getDataNodes().get(0).getECN(); Assert.assertNotEquals(PipelineAck.ECN.DISABLED, ecn); } finally { if (cluster != null) { cluster.shutdown(); } } }
/** * Send an OOB response. If all acks have been sent already for the block * and the responder is about to close, the delivery is not guaranteed. * This is because the other end can close the connection independently. * An OOB coming from downstream will be automatically relayed upstream * by the responder. This method is used only by originating datanode. * * @param ackStatus the type of ack to be sent */ void sendOOBResponse(final Status ackStatus) throws IOException, InterruptedException { if (!running) { LOG.info("Cannot send OOB response " + ackStatus + ". Responder not running."); return; } synchronized(this) { if (sending) { wait(PipelineAck.getOOBTimeout(ackStatus)); // Didn't get my turn in time. Give up. if (sending) { throw new IOException("Could not send OOB reponse in time: " + ackStatus); } } sending = true; } LOG.info("Sending an out of band ack of type " + ackStatus); try { sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L, ackStatus); } finally { // Let others send ack. Unless there are miltiple OOB send // calls, there can be only one waiter, the responder thread. // In any case, only one needs to be notified. synchronized(this) { sending = false; notify(); } } }
/** * The wrapper for the unprotected version. This is only called by * the responder's run() method. * * @param ack Ack received from downstream * @param seqno sequence number of ack to be sent upstream * @param totalAckTimeNanos total ack time including all the downstream * nodes * @param offsetInBlock offset in block for the data in packet * @param myStatus the local ack status */ private void sendAckUpstream(PipelineAck ack, long seqno, long totalAckTimeNanos, long offsetInBlock, Status myStatus) throws IOException { try { // Wait for other sender to finish. Unless there is an OOB being sent, // the responder won't have to wait. synchronized(this) { while(sending) { wait(); } sending = true; } try { if (!running) return; sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos, offsetInBlock, myStatus); } finally { synchronized(this) { sending = false; notify(); } } } catch (InterruptedException ie) { // The responder was interrupted. Make it go down without // interrupting the receiver(writer) thread. running = false; } }
public void sendOOB() throws IOException, InterruptedException { ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck .getRestartOOBStatus()); }
/** * @param ack Ack received from downstream * @param seqno sequence number of ack to be sent upstream * @param totalAckTimeNanos total ack time including all the downstream * nodes * @param offsetInBlock offset in block for the data in packet * @param myHeader the local ack header */ private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno, long totalAckTimeNanos, long offsetInBlock, int myHeader) throws IOException { final int[] replies; if (ack == null) { // A new OOB response is being sent from this node. Regardless of // downstream nodes, reply should contain one reply. replies = new int[] { myHeader }; } else if (mirrorError) { // ack read error int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS); int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR); replies = new int[] {h, h1}; } else { short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack .getNumOfReplies(); replies = new int[ackLen + 1]; replies[0] = myHeader; for (int i = 0; i < ackLen; ++i) { replies[i + 1] = ack.getHeaderFlag(i); } // If the mirror has reported that it received a corrupt packet, // do self-destruct to mark myself bad, instead of making the // mirror node bad. The mirror is guaranteed to be good without // corrupt data on disk. if (ackLen > 0 && PipelineAck.getStatusFromHeader(replies[1]) == Status.ERROR_CHECKSUM) { throw new IOException("Shutting down writer and responder " + "since the down streams reported the data sent by this " + "thread is corrupt"); } } PipelineAck replyAck = new PipelineAck(seqno, replies, totalAckTimeNanos); if (replyAck.isSuccess() && offsetInBlock > replicaInfo.getBytesAcked()) { replicaInfo.setBytesAcked(offsetInBlock); } // send my ack back to upstream datanode long begin = Time.monotonicNow(); replyAck.write(upstreamOut); upstreamOut.flush(); long duration = Time.monotonicNow() - begin; if (duration > datanodeSlowLogThresholdMs) { LOG.warn("Slow PacketResponder send ack to upstream took " + duration + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString + ", replyAck=" + replyAck); } else if (LOG.isDebugEnabled()) { LOG.debug(myString + ", replyAck=" + replyAck); } // If a corruption was detected in the received data, terminate after // sending ERROR_CHECKSUM back. Status myStatus = PipelineAck.getStatusFromHeader(myHeader); if (myStatus == Status.ERROR_CHECKSUM) { throw new IOException("Shutting down writer and responder " + "due to a checksum error in received data. The error " + "response has been sent upstream."); } }
/** * @param ack Ack received from downstream * @param seqno sequence number of ack to be sent upstream * @param totalAckTimeNanos total ack time including all the downstream * nodes * @param offsetInBlock offset in block for the data in packet * @param myStatus the local ack status */ private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno, long totalAckTimeNanos, long offsetInBlock, Status myStatus) throws IOException { Status[] replies = null; if (ack == null) { // A new OOB response is being sent from this node. Regardless of // downstream nodes, reply should contain one reply. replies = new Status[1]; replies[0] = myStatus; } else if (mirrorError) { // ack read error replies = MIRROR_ERROR_STATUS; } else { short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack .getNumOfReplies(); replies = new Status[1 + ackLen]; replies[0] = myStatus; for (int i = 0; i < ackLen; i++) { replies[i + 1] = ack.getReply(i); } // If the mirror has reported that it received a corrupt packet, // do self-destruct to mark myself bad, instead of making the // mirror node bad. The mirror is guaranteed to be good without // corrupt data on disk. if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) { throw new IOException("Shutting down writer and responder " + "since the down streams reported the data sent by this " + "thread is corrupt"); } } PipelineAck replyAck = new PipelineAck(seqno, replies, totalAckTimeNanos); if (replyAck.isSuccess() && offsetInBlock > replicaInfo.getBytesAcked()) { replicaInfo.setBytesAcked(offsetInBlock); } // send my ack back to upstream datanode long begin = Time.monotonicNow(); replyAck.write(upstreamOut); upstreamOut.flush(); long duration = Time.monotonicNow() - begin; if (duration > datanodeSlowLogThresholdMs) { LOG.warn("Slow PacketResponder send ack to upstream took " + duration + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString + ", replyAck=" + replyAck); } else if (LOG.isDebugEnabled()) { LOG.debug(myString + ", replyAck=" + replyAck); } // If a corruption was detected in the received data, terminate after // sending ERROR_CHECKSUM back. if (myStatus == Status.ERROR_CHECKSUM) { throw new IOException("Shutting down writer and responder " + "due to a checksum error in received data. The error " + "response has been sent upstream."); } }
/** * @param ack Ack received from downstream * @param seqno sequence number of ack to be sent upstream * @param totalAckTimeNanos total ack time including all the downstream * nodes * @param offsetInBlock offset in block for the data in packet * @param myStatus the local ack status */ private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno, long totalAckTimeNanos, long offsetInBlock, Status myStatus) throws IOException { Status[] replies = null; if (ack == null) { // A new OOB response is being sent from this node. Regardless of // downstream nodes, reply should contain one reply. replies = new Status[1]; replies[0] = myStatus; } else if (mirrorError) { // ack read error replies = MIRROR_ERROR_STATUS; } else { short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack .getNumOfReplies(); replies = new Status[1 + ackLen]; replies[0] = myStatus; for (int i = 0; i < ackLen; i++) { replies[i + 1] = ack.getReply(i); } // If the mirror has reported that it received a corrupt packet, // do self-destruct to mark myself bad, instead of making the // mirror node bad. The mirror is guaranteed to be good without // corrupt data on disk. if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) { throw new IOException("Shutting down writer and responder " + "since the down streams reported the data sent by this " + "thread is corrupt"); } } PipelineAck replyAck = new PipelineAck(seqno, replies, totalAckTimeNanos); if (replyAck.isSuccess() && offsetInBlock > replicaInfo.getBytesAcked()) { replicaInfo.setBytesAcked(offsetInBlock); } // send my ack back to upstream datanode replyAck.write(upstreamOut); upstreamOut.flush(); if (LOG.isDebugEnabled()) { LOG.debug(myString + ", replyAck=" + replyAck); } // If a corruption was detected in the received data, terminate after // sending ERROR_CHECKSUM back. if (myStatus == Status.ERROR_CHECKSUM) { throw new IOException("Shutting down writer and responder " + "due to a checksum error in received data. The error " + "response has been sent upstream."); } }
/** * The ECN bit for the DataNode. The DataNode should return: * <ul> * <li>ECN.DISABLED when ECN is disabled.</li> * <li>ECN.SUPPORTED when ECN is enabled but the DN still has capacity.</li> * <li>ECN.CONGESTED when ECN is enabled and the DN is congested.</li> * </ul> */ public PipelineAck.ECN getECN() { return pipelineSupportECN ? PipelineAck.ECN.SUPPORTED : PipelineAck.ECN .DISABLED; }