Java 类org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck 实例源码

项目:hadoop    文件:TestDataTransferProtocol.java   
private void writeZeroLengthPacket(ExtendedBlock block, String description)
throws IOException {
  PacketHeader hdr = new PacketHeader(
    8,                   // size of packet
    block.getNumBytes(), // OffsetInBlock
    100,                 // sequencenumber
    true,                // lastPacketInBlock
    0,                   // chunk length
    false);               // sync block
  hdr.write(sendOut);
  sendOut.writeInt(0);           // zero checksum

  //ok finally write a block with 0 len
  sendResponse(Status.SUCCESS, "", null, recvOut);
  new PipelineAck(100, new int[] {PipelineAck.combineHeader
    (PipelineAck.ECN.DISABLED, Status.SUCCESS)}).write
    (recvOut);
  sendRecvData(description, false);
}
项目:aliyun-oss-hadoop-fs    文件:TestDataTransferProtocol.java   
private void writeZeroLengthPacket(ExtendedBlock block, String description)
throws IOException {
  PacketHeader hdr = new PacketHeader(
    8,                   // size of packet
    block.getNumBytes(), // OffsetInBlock
    100,                 // sequencenumber
    true,                // lastPacketInBlock
    0,                   // chunk length
    false);               // sync block
  hdr.write(sendOut);
  sendOut.writeInt(0);           // zero checksum

  //ok finally write a block with 0 len
  sendResponse(Status.SUCCESS, "", null, recvOut);
  new PipelineAck(100, new int[] {PipelineAck.combineHeader
    (PipelineAck.ECN.DISABLED, Status.SUCCESS)}).write
    (recvOut);
  sendRecvData(description, false);
}
项目:big-c    文件:TestDataTransferProtocol.java   
private void writeZeroLengthPacket(ExtendedBlock block, String description)
throws IOException {
  PacketHeader hdr = new PacketHeader(
    8,                   // size of packet
    block.getNumBytes(), // OffsetInBlock
    100,                 // sequencenumber
    true,                // lastPacketInBlock
    0,                   // chunk length
    false);               // sync block
  hdr.write(sendOut);
  sendOut.writeInt(0);           // zero checksum

  //ok finally write a block with 0 len
  sendResponse(Status.SUCCESS, "", null, recvOut);
  new PipelineAck(100, new int[] {PipelineAck.combineHeader
    (PipelineAck.ECN.DISABLED, Status.SUCCESS)}).write
    (recvOut);
  sendRecvData(description, false);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestDataTransferProtocol.java   
private void writeZeroLengthPacket(ExtendedBlock block, String description)
throws IOException {
  PacketHeader hdr = new PacketHeader(
    8,                   // size of packet
    block.getNumBytes(), // OffsetInBlock
    100,                 // sequencenumber
    true,                // lastPacketInBlock
    0,                   // chunk length
    false);               // sync block
  hdr.write(sendOut);
  sendOut.writeInt(0);           // zero checksum

  //ok finally write a block with 0 len
  sendResponse(Status.SUCCESS, "", null, recvOut);
  new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
  sendRecvData(description, false);
}
项目:hadoop-plus    文件:TestDataTransferProtocol.java   
private void writeZeroLengthPacket(ExtendedBlock block, String description)
throws IOException {
  PacketHeader hdr = new PacketHeader(
    8,                   // size of packet
    block.getNumBytes(), // OffsetInBlock
    100,                 // sequencenumber
    true,                // lastPacketInBlock
    0,                   // chunk length
    false);               // sync block
  hdr.write(sendOut);
  sendOut.writeInt(0);           // zero checksum

  //ok finally write a block with 0 len
  sendResponse(Status.SUCCESS, "", null, recvOut);
  new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
  sendRecvData(description, false);
}
项目:FlexMap    文件:TestDataTransferProtocol.java   
private void writeZeroLengthPacket(ExtendedBlock block, String description)
throws IOException {
  PacketHeader hdr = new PacketHeader(
    8,                   // size of packet
    block.getNumBytes(), // OffsetInBlock
    100,                 // sequencenumber
    true,                // lastPacketInBlock
    0,                   // chunk length
    false);               // sync block
  hdr.write(sendOut);
  sendOut.writeInt(0);           // zero checksum

  //ok finally write a block with 0 len
  sendResponse(Status.SUCCESS, "", null, recvOut);
  new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
  sendRecvData(description, false);
}
项目:hops    文件:TestDataTransferProtocol.java   
private void writeZeroLengthPacket(ExtendedBlock block, String description)
    throws IOException {
  PacketHeader hdr = new PacketHeader(8,                   // size of packet
      block.getNumBytes(), // OffsetInBlock
      100,                 // sequencenumber
      true,                // lastPacketInBlock
      0,                   // chunk length
      false);               // sync block
  hdr.write(sendOut);
  sendOut.writeInt(0);           // zero checksum

  //ok finally write a block with 0 len
  sendResponse(Status.SUCCESS, "", null, recvOut);
  new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
  sendRecvData(description, false);
}
项目:hbase    文件:FanOutOneBlockAsyncDFSOutput.java   
@Override
protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception {
  Status reply = getStatus(ack);
  if (reply != Status.SUCCESS) {
    failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " +
      block + " from datanode " + ctx.channel().remoteAddress()));
    return;
  }
  if (PipelineAck.isRestartOOBStatus(reply)) {
    failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " +
      block + " from datanode " + ctx.channel().remoteAddress()));
    return;
  }
  if (ack.getSeqno() == HEART_BEAT_SEQNO) {
    return;
  }
  completed(ctx.channel());
}
项目:hadoop-TCP    文件:TestDataTransferProtocol.java   
private void writeZeroLengthPacket(ExtendedBlock block, String description)
throws IOException {
  PacketHeader hdr = new PacketHeader(
    8,                   // size of packet
    block.getNumBytes(), // OffsetInBlock
    100,                 // sequencenumber
    true,                // lastPacketInBlock
    0,                   // chunk length
    false);               // sync block
  hdr.write(sendOut);
  sendOut.writeInt(0);           // zero checksum

  //ok finally write a block with 0 len
  sendResponse(Status.SUCCESS, "", null, recvOut);
  new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
  sendRecvData(description, false);
}
项目:hardfs    文件:TestDataTransferProtocol.java   
private void writeZeroLengthPacket(ExtendedBlock block, String description)
throws IOException {
  PacketHeader hdr = new PacketHeader(
    8,                   // size of packet
    block.getNumBytes(), // OffsetInBlock
    100,                 // sequencenumber
    true,                // lastPacketInBlock
    0,                   // chunk length
    false);               // sync block
  hdr.write(sendOut);
  sendOut.writeInt(0);           // zero checksum

  //ok finally write a block with 0 len
  sendResponse(Status.SUCCESS, "", null, recvOut);
  new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
  sendRecvData(description, false);
}
项目:hadoop-on-lustre2    文件:TestDataTransferProtocol.java   
private void writeZeroLengthPacket(ExtendedBlock block, String description)
throws IOException {
  PacketHeader hdr = new PacketHeader(
    8,                   // size of packet
    block.getNumBytes(), // OffsetInBlock
    100,                 // sequencenumber
    true,                // lastPacketInBlock
    0,                   // chunk length
    false);               // sync block
  hdr.write(sendOut);
  sendOut.writeInt(0);           // zero checksum

  //ok finally write a block with 0 len
  sendResponse(Status.SUCCESS, "", null, recvOut);
  new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
  sendRecvData(description, false);
}
项目:hadoop    文件:BlockReceiver.java   
/**
 * Send an OOB response. If all acks have been sent already for the block
 * and the responder is about to close, the delivery is not guaranteed.
 * This is because the other end can close the connection independently.
 * An OOB coming from downstream will be automatically relayed upstream
 * by the responder. This method is used only by originating datanode.
 *
 * @param ackStatus the type of ack to be sent
 */
void sendOOBResponse(final Status ackStatus) throws IOException,
    InterruptedException {
  if (!running) {
    LOG.info("Cannot send OOB response " + ackStatus + 
        ". Responder not running.");
    return;
  }

  synchronized(this) {
    if (sending) {
      wait(PipelineAck.getOOBTimeout(ackStatus));
      // Didn't get my turn in time. Give up.
      if (sending) {
        throw new IOException("Could not send OOB reponse in time: "
            + ackStatus);
      }
    }
    sending = true;
  }

  LOG.info("Sending an out of band ack of type " + ackStatus);
  try {
    sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
        PipelineAck.combineHeader(datanode.getECN(), ackStatus));
  } finally {
    // Let others send ack. Unless there are miltiple OOB send
    // calls, there can be only one waiter, the responder thread.
    // In any case, only one needs to be notified.
    synchronized(this) {
      sending = false;
      notify();
    }
  }
}
项目:hadoop    文件:BlockReceiver.java   
/**
 * The wrapper for the unprotected version. This is only called by
 * the responder's run() method.
 *
 * @param ack Ack received from downstream
 * @param seqno sequence number of ack to be sent upstream
 * @param totalAckTimeNanos total ack time including all the downstream
 *          nodes
 * @param offsetInBlock offset in block for the data in packet
 * @param myHeader the local ack header
 */
private void sendAckUpstream(PipelineAck ack, long seqno,
    long totalAckTimeNanos, long offsetInBlock,
    int myHeader) throws IOException {
  try {
    // Wait for other sender to finish. Unless there is an OOB being sent,
    // the responder won't have to wait.
    synchronized(this) {
      while(sending) {
        wait();
      }
      sending = true;
    }

    try {
      if (!running) return;
      sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,
          offsetInBlock, myHeader);
    } finally {
      synchronized(this) {
        sending = false;
        notify();
      }
    }
  } catch (InterruptedException ie) {
    // The responder was interrupted. Make it go down without
    // interrupting the receiver(writer) thread.  
    running = false;
  }
}
项目:hadoop    文件:TestDataTransferProtocol.java   
@Test
public void TestPipeLineAckCompatibility() throws IOException {
  DataTransferProtos.PipelineAckProto proto = DataTransferProtos
      .PipelineAckProto.newBuilder()
      .setSeqno(0)
      .addReply(Status.CHECKSUM_OK)
      .build();

  DataTransferProtos.PipelineAckProto newProto = DataTransferProtos
      .PipelineAckProto.newBuilder().mergeFrom(proto)
      .addFlag(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED,
                                         Status.CHECKSUM_OK))
      .build();

  ByteArrayOutputStream oldAckBytes = new ByteArrayOutputStream();
  proto.writeDelimitedTo(oldAckBytes);
  PipelineAck oldAck = new PipelineAck();
  oldAck.readFields(new ByteArrayInputStream(oldAckBytes.toByteArray()));
  assertEquals(
      PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, Status.CHECKSUM_OK),
      oldAck.getHeaderFlag(0));

  PipelineAck newAck = new PipelineAck();
  ByteArrayOutputStream newAckBytes = new ByteArrayOutputStream();
  newProto.writeDelimitedTo(newAckBytes);
  newAck.readFields(new ByteArrayInputStream(newAckBytes.toByteArray()));
  assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED,
                                         Status.CHECKSUM_OK),
               newAck.getHeaderFlag(0));
}
项目:aliyun-oss-hadoop-fs    文件:BlockReceiver.java   
/**
 * Send an OOB response. If all acks have been sent already for the block
 * and the responder is about to close, the delivery is not guaranteed.
 * This is because the other end can close the connection independently.
 * An OOB coming from downstream will be automatically relayed upstream
 * by the responder. This method is used only by originating datanode.
 *
 * @param ackStatus the type of ack to be sent
 */
void sendOOBResponse(final Status ackStatus) throws IOException,
    InterruptedException {
  if (!running) {
    LOG.info("Cannot send OOB response " + ackStatus + 
        ". Responder not running.");
    return;
  }

  synchronized(this) {
    if (sending) {
      wait(datanode.getOOBTimeout(ackStatus));
      // Didn't get my turn in time. Give up.
      if (sending) {
        throw new IOException("Could not send OOB reponse in time: "
            + ackStatus);
      }
    }
    sending = true;
  }

  LOG.info("Sending an out of band ack of type " + ackStatus);
  try {
    sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
        PipelineAck.combineHeader(datanode.getECN(), ackStatus));
  } finally {
    // Let others send ack. Unless there are miltiple OOB send
    // calls, there can be only one waiter, the responder thread.
    // In any case, only one needs to be notified.
    synchronized(this) {
      sending = false;
      notify();
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:BlockReceiver.java   
/**
 * The wrapper for the unprotected version. This is only called by
 * the responder's run() method.
 *
 * @param ack Ack received from downstream
 * @param seqno sequence number of ack to be sent upstream
 * @param totalAckTimeNanos total ack time including all the downstream
 *          nodes
 * @param offsetInBlock offset in block for the data in packet
 * @param myHeader the local ack header
 */
private void sendAckUpstream(PipelineAck ack, long seqno,
    long totalAckTimeNanos, long offsetInBlock,
    int myHeader) throws IOException {
  try {
    // Wait for other sender to finish. Unless there is an OOB being sent,
    // the responder won't have to wait.
    synchronized(this) {
      while(sending) {
        wait();
      }
      sending = true;
    }

    try {
      if (!running) return;
      sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,
          offsetInBlock, myHeader);
    } finally {
      synchronized(this) {
        sending = false;
        notify();
      }
    }
  } catch (InterruptedException ie) {
    // The responder was interrupted. Make it go down without
    // interrupting the receiver(writer) thread.  
    running = false;
  }
}
项目:aliyun-oss-hadoop-fs    文件:DataNode.java   
/**
 * The ECN bit for the DataNode. The DataNode should return:
 * <ul>
 *   <li>ECN.DISABLED when ECN is disabled.</li>
 *   <li>ECN.SUPPORTED when ECN is enabled but the DN still has capacity.</li>
 *   <li>ECN.CONGESTED when ECN is enabled and the DN is congested.</li>
 * </ul>
 */
public PipelineAck.ECN getECN() {
  if (!pipelineSupportECN) {
    return PipelineAck.ECN.DISABLED;
  }
  double load = ManagementFactory.getOperatingSystemMXBean()
      .getSystemLoadAverage();
  return load > NUM_CORES * CONGESTION_RATIO ? PipelineAck.ECN.CONGESTED :
      PipelineAck.ECN.SUPPORTED;
}
项目:aliyun-oss-hadoop-fs    文件:TestDataTransferProtocol.java   
@Test
public void TestPipeLineAckCompatibility() throws IOException {
  DataTransferProtos.PipelineAckProto proto = DataTransferProtos
      .PipelineAckProto.newBuilder()
      .setSeqno(0)
      .addReply(Status.CHECKSUM_OK)
      .build();

  DataTransferProtos.PipelineAckProto newProto = DataTransferProtos
      .PipelineAckProto.newBuilder().mergeFrom(proto)
      .addFlag(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED,
                                         Status.CHECKSUM_OK))
      .build();

  ByteArrayOutputStream oldAckBytes = new ByteArrayOutputStream();
  proto.writeDelimitedTo(oldAckBytes);
  PipelineAck oldAck = new PipelineAck();
  oldAck.readFields(new ByteArrayInputStream(oldAckBytes.toByteArray()));
  assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, Status
      .CHECKSUM_OK), oldAck.getHeaderFlag(0));

  PipelineAck newAck = new PipelineAck();
  ByteArrayOutputStream newAckBytes = new ByteArrayOutputStream();
  newProto.writeDelimitedTo(newAckBytes);
  newAck.readFields(new ByteArrayInputStream(newAckBytes.toByteArray()));
  assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED, Status
      .CHECKSUM_OK), newAck.getHeaderFlag(0));
}
项目:aliyun-oss-hadoop-fs    文件:TestDataNodeECN.java   
@Test
public void testECNFlag() throws IOException {
  Configuration conf = new Configuration();
  conf.setBoolean(DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED, true);
  MiniDFSCluster cluster = null;
  try {
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
    PipelineAck.ECN ecn = cluster.getDataNodes().get(0).getECN();
    Assert.assertNotEquals(PipelineAck.ECN.DISABLED, ecn);
  } finally {
    if (cluster != null) {
      cluster.shutdown();
    }
  }
}
项目:big-c    文件:BlockReceiver.java   
/**
 * Send an OOB response. If all acks have been sent already for the block
 * and the responder is about to close, the delivery is not guaranteed.
 * This is because the other end can close the connection independently.
 * An OOB coming from downstream will be automatically relayed upstream
 * by the responder. This method is used only by originating datanode.
 *
 * @param ackStatus the type of ack to be sent
 */
void sendOOBResponse(final Status ackStatus) throws IOException,
    InterruptedException {
  if (!running) {
    LOG.info("Cannot send OOB response " + ackStatus + 
        ". Responder not running.");
    return;
  }

  synchronized(this) {
    if (sending) {
      wait(PipelineAck.getOOBTimeout(ackStatus));
      // Didn't get my turn in time. Give up.
      if (sending) {
        throw new IOException("Could not send OOB reponse in time: "
            + ackStatus);
      }
    }
    sending = true;
  }

  LOG.info("Sending an out of band ack of type " + ackStatus);
  try {
    sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
        PipelineAck.combineHeader(datanode.getECN(), ackStatus));
  } finally {
    // Let others send ack. Unless there are miltiple OOB send
    // calls, there can be only one waiter, the responder thread.
    // In any case, only one needs to be notified.
    synchronized(this) {
      sending = false;
      notify();
    }
  }
}
项目:big-c    文件:BlockReceiver.java   
/**
 * The wrapper for the unprotected version. This is only called by
 * the responder's run() method.
 *
 * @param ack Ack received from downstream
 * @param seqno sequence number of ack to be sent upstream
 * @param totalAckTimeNanos total ack time including all the downstream
 *          nodes
 * @param offsetInBlock offset in block for the data in packet
 * @param myHeader the local ack header
 */
private void sendAckUpstream(PipelineAck ack, long seqno,
    long totalAckTimeNanos, long offsetInBlock,
    int myHeader) throws IOException {
  try {
    // Wait for other sender to finish. Unless there is an OOB being sent,
    // the responder won't have to wait.
    synchronized(this) {
      while(sending) {
        wait();
      }
      sending = true;
    }

    try {
      if (!running) return;
      sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,
          offsetInBlock, myHeader);
    } finally {
      synchronized(this) {
        sending = false;
        notify();
      }
    }
  } catch (InterruptedException ie) {
    // The responder was interrupted. Make it go down without
    // interrupting the receiver(writer) thread.  
    running = false;
  }
}
项目:big-c    文件:TestDataTransferProtocol.java   
@Test
public void TestPipeLineAckCompatibility() throws IOException {
  DataTransferProtos.PipelineAckProto proto = DataTransferProtos
      .PipelineAckProto.newBuilder()
      .setSeqno(0)
      .addReply(Status.CHECKSUM_OK)
      .build();

  DataTransferProtos.PipelineAckProto newProto = DataTransferProtos
      .PipelineAckProto.newBuilder().mergeFrom(proto)
      .addFlag(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED,
                                         Status.CHECKSUM_OK))
      .build();

  ByteArrayOutputStream oldAckBytes = new ByteArrayOutputStream();
  proto.writeDelimitedTo(oldAckBytes);
  PipelineAck oldAck = new PipelineAck();
  oldAck.readFields(new ByteArrayInputStream(oldAckBytes.toByteArray()));
  assertEquals(
      PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, Status.CHECKSUM_OK),
      oldAck.getHeaderFlag(0));

  PipelineAck newAck = new PipelineAck();
  ByteArrayOutputStream newAckBytes = new ByteArrayOutputStream();
  newProto.writeDelimitedTo(newAckBytes);
  newAck.readFields(new ByteArrayInputStream(newAckBytes.toByteArray()));
  assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED,
                                         Status.CHECKSUM_OK),
               newAck.getHeaderFlag(0));
}
项目:hadoop-2.6.0-cdh5.4.3    文件:BlockReceiver.java   
/**
 * Send an OOB response. If all acks have been sent already for the block
 * and the responder is about to close, the delivery is not guaranteed.
 * This is because the other end can close the connection independently.
 * An OOB coming from downstream will be automatically relayed upstream
 * by the responder. This method is used only by originating datanode.
 *
 * @param ackStatus the type of ack to be sent
 */
void sendOOBResponse(final Status ackStatus) throws IOException,
    InterruptedException {
  if (!running) {
    LOG.info("Cannot send OOB response " + ackStatus + 
        ". Responder not running.");
    return;
  }

  synchronized(this) {
    if (sending) {
      wait(PipelineAck.getOOBTimeout(ackStatus));
      // Didn't get my turn in time. Give up.
      if (sending) {
        throw new IOException("Could not send OOB reponse in time: "
            + ackStatus);
      }
    }
    sending = true;
  }

  LOG.info("Sending an out of band ack of type " + ackStatus);
  try {
    sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
        ackStatus);
  } finally {
    // Let others send ack. Unless there are miltiple OOB send
    // calls, there can be only one waiter, the responder thread.
    // In any case, only one needs to be notified.
    synchronized(this) {
      sending = false;
      notify();
    }
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:BlockReceiver.java   
/**
 * The wrapper for the unprotected version. This is only called by
 * the responder's run() method.
 *
 * @param ack Ack received from downstream
 * @param seqno sequence number of ack to be sent upstream
 * @param totalAckTimeNanos total ack time including all the downstream
 *          nodes
 * @param offsetInBlock offset in block for the data in packet
 * @param myStatus the local ack status
 */
private void sendAckUpstream(PipelineAck ack, long seqno,
    long totalAckTimeNanos, long offsetInBlock,
    Status myStatus) throws IOException {
  try {
    // Wait for other sender to finish. Unless there is an OOB being sent,
    // the responder won't have to wait.
    synchronized(this) {
      while(sending) {
        wait();
      }
      sending = true;
    }

    try {
      if (!running) return;
      sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,
          offsetInBlock, myStatus);
    } finally {
      synchronized(this) {
        sending = false;
        notify();
      }
    }
  } catch (InterruptedException ie) {
    // The responder was interrupted. Make it go down without
    // interrupting the receiver(writer) thread.  
    running = false;
  }
}
项目:FlexMap    文件:BlockReceiver.java   
/**
 * Send an OOB response. If all acks have been sent already for the block
 * and the responder is about to close, the delivery is not guaranteed.
 * This is because the other end can close the connection independently.
 * An OOB coming from downstream will be automatically relayed upstream
 * by the responder. This method is used only by originating datanode.
 *
 * @param ackStatus the type of ack to be sent
 */
void sendOOBResponse(final Status ackStatus) throws IOException,
    InterruptedException {
  if (!running) {
    LOG.info("Cannot send OOB response " + ackStatus + 
        ". Responder not running.");
    return;
  }

  synchronized(this) {
    if (sending) {
      wait(PipelineAck.getOOBTimeout(ackStatus));
      // Didn't get my turn in time. Give up.
      if (sending) {
        throw new IOException("Could not send OOB reponse in time: "
            + ackStatus);
      }
    }
    sending = true;
  }

  LOG.info("Sending an out of band ack of type " + ackStatus);
  try {
    sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
        ackStatus);
  } finally {
    // Let others send ack. Unless there are miltiple OOB send
    // calls, there can be only one waiter, the responder thread.
    // In any case, only one needs to be notified.
    synchronized(this) {
      sending = false;
      notify();
    }
  }
}
项目:FlexMap    文件:BlockReceiver.java   
/**
 * The wrapper for the unprotected version. This is only called by
 * the responder's run() method.
 *
 * @param ack Ack received from downstream
 * @param seqno sequence number of ack to be sent upstream
 * @param totalAckTimeNanos total ack time including all the downstream
 *          nodes
 * @param offsetInBlock offset in block for the data in packet
 * @param myStatus the local ack status
 */
private void sendAckUpstream(PipelineAck ack, long seqno,
    long totalAckTimeNanos, long offsetInBlock,
    Status myStatus) throws IOException {
  try {
    // Wait for other sender to finish. Unless there is an OOB being sent,
    // the responder won't have to wait.
    synchronized(this) {
      while(sending) {
        wait();
      }
      sending = true;
    }

    try {
      if (!running) return;
      sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,
          offsetInBlock, myStatus);
    } finally {
      synchronized(this) {
        sending = false;
        notify();
      }
    }
  } catch (InterruptedException ie) {
    // The responder was interrupted. Make it go down without
    // interrupting the receiver(writer) thread.  
    running = false;
  }
}
项目:hadoop-on-lustre2    文件:BlockReceiver.java   
/**
 * Send an OOB response. If all acks have been sent already for the block
 * and the responder is about to close, the delivery is not guaranteed.
 * This is because the other end can close the connection independently.
 * An OOB coming from downstream will be automatically relayed upstream
 * by the responder. This method is used only by originating datanode.
 *
 * @param ackStatus the type of ack to be sent
 */
void sendOOBResponse(final Status ackStatus) throws IOException,
    InterruptedException {
  if (!running) {
    LOG.info("Cannot send OOB response " + ackStatus + 
        ". Responder not running.");
    return;
  }

  synchronized(this) {
    if (sending) {
      wait(PipelineAck.getOOBTimeout(ackStatus));
      // Didn't get my turn in time. Give up.
      if (sending) {
        throw new IOException("Could not send OOB reponse in time: "
            + ackStatus);
      }
    }
    sending = true;
  }

  LOG.info("Sending an out of band ack of type " + ackStatus);
  try {
    sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
        ackStatus);
  } finally {
    // Let others send ack. Unless there are miltiple OOB send
    // calls, there can be only one waiter, the responder thread.
    // In any case, only one needs to be notified.
    synchronized(this) {
      sending = false;
      notify();
    }
  }
}
项目:hadoop-on-lustre2    文件:BlockReceiver.java   
/**
 * The wrapper for the unprotected version. This is only called by
 * the responder's run() method.
 *
 * @param ack Ack received from downstream
 * @param seqno sequence number of ack to be sent upstream
 * @param totalAckTimeNanos total ack time including all the downstream
 *          nodes
 * @param offsetInBlock offset in block for the data in packet
 * @param myStatus the local ack status
 */
private void sendAckUpstream(PipelineAck ack, long seqno,
    long totalAckTimeNanos, long offsetInBlock,
    Status myStatus) throws IOException {
  try {
    // Wait for other sender to finish. Unless there is an OOB being sent,
    // the responder won't have to wait.
    synchronized(this) {
      while(sending) {
        wait();
      }
      sending = true;
    }

    try {
      if (!running) return;
      sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,
          offsetInBlock, myStatus);
    } finally {
      synchronized(this) {
        sending = false;
        notify();
      }
    }
  } catch (InterruptedException ie) {
    // The responder was interrupted. Make it go down without
    // interrupting the receiver(writer) thread.  
    running = false;
  }
}
项目:hadoop    文件:BlockReceiver.java   
public void sendOOB() throws IOException, InterruptedException {
  ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
      .getRestartOOBStatus());
}
项目:hadoop    文件:BlockReceiver.java   
/**
 * @param ack Ack received from downstream
 * @param seqno sequence number of ack to be sent upstream
 * @param totalAckTimeNanos total ack time including all the downstream
 *          nodes
 * @param offsetInBlock offset in block for the data in packet
 * @param myHeader the local ack header
 */
private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
    long totalAckTimeNanos, long offsetInBlock, int myHeader)
    throws IOException {
  final int[] replies;
  if (ack == null) {
    // A new OOB response is being sent from this node. Regardless of
    // downstream nodes, reply should contain one reply.
    replies = new int[] { myHeader };
  } else if (mirrorError) { // ack read error
    int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
    int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
    replies = new int[] {h, h1};
  } else {
    short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
        .getNumOfReplies();
    replies = new int[ackLen + 1];
    replies[0] = myHeader;
    for (int i = 0; i < ackLen; ++i) {
      replies[i + 1] = ack.getHeaderFlag(i);
    }
    // If the mirror has reported that it received a corrupt packet,
    // do self-destruct to mark myself bad, instead of making the
    // mirror node bad. The mirror is guaranteed to be good without
    // corrupt data on disk.
    if (ackLen > 0 && PipelineAck.getStatusFromHeader(replies[1]) ==
      Status.ERROR_CHECKSUM) {
      throw new IOException("Shutting down writer and responder "
          + "since the down streams reported the data sent by this "
          + "thread is corrupt");
    }
  }
  PipelineAck replyAck = new PipelineAck(seqno, replies,
      totalAckTimeNanos);
  if (replyAck.isSuccess()
      && offsetInBlock > replicaInfo.getBytesAcked()) {
    replicaInfo.setBytesAcked(offsetInBlock);
  }
  // send my ack back to upstream datanode
  long begin = Time.monotonicNow();
  replyAck.write(upstreamOut);
  upstreamOut.flush();
  long duration = Time.monotonicNow() - begin;
  if (duration > datanodeSlowLogThresholdMs) {
    LOG.warn("Slow PacketResponder send ack to upstream took " + duration
        + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString
        + ", replyAck=" + replyAck);
  } else if (LOG.isDebugEnabled()) {
    LOG.debug(myString + ", replyAck=" + replyAck);
  }

  // If a corruption was detected in the received data, terminate after
  // sending ERROR_CHECKSUM back.
  Status myStatus = PipelineAck.getStatusFromHeader(myHeader);
  if (myStatus == Status.ERROR_CHECKSUM) {
    throw new IOException("Shutting down writer and responder "
        + "due to a checksum error in received data. The error "
        + "response has been sent upstream.");
  }
}
项目:aliyun-oss-hadoop-fs    文件:BlockReceiver.java   
public void sendOOB() throws IOException, InterruptedException {
  ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
      .getRestartOOBStatus());
}
项目:aliyun-oss-hadoop-fs    文件:BlockReceiver.java   
/**
 * @param ack Ack received from downstream
 * @param seqno sequence number of ack to be sent upstream
 * @param totalAckTimeNanos total ack time including all the downstream
 *          nodes
 * @param offsetInBlock offset in block for the data in packet
 * @param myHeader the local ack header
 */
private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
    long totalAckTimeNanos, long offsetInBlock, int myHeader)
    throws IOException {
  final int[] replies;
  if (ack == null) {
    // A new OOB response is being sent from this node. Regardless of
    // downstream nodes, reply should contain one reply.
    replies = new int[] { myHeader };
  } else if (mirrorError) { // ack read error
    int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
    int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
    replies = new int[] {h, h1};
  } else {
    short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
        .getNumOfReplies();
    replies = new int[ackLen + 1];
    replies[0] = myHeader;
    for (int i = 0; i < ackLen; ++i) {
      replies[i + 1] = ack.getHeaderFlag(i);
    }
    // If the mirror has reported that it received a corrupt packet,
    // do self-destruct to mark myself bad, instead of making the
    // mirror node bad. The mirror is guaranteed to be good without
    // corrupt data on disk.
    if (ackLen > 0 && PipelineAck.getStatusFromHeader(replies[1]) ==
      Status.ERROR_CHECKSUM) {
      throw new IOException("Shutting down writer and responder "
          + "since the down streams reported the data sent by this "
          + "thread is corrupt");
    }
  }
  PipelineAck replyAck = new PipelineAck(seqno, replies,
      totalAckTimeNanos);
  if (replyAck.isSuccess()
      && offsetInBlock > replicaInfo.getBytesAcked()) {
    replicaInfo.setBytesAcked(offsetInBlock);
  }
  // send my ack back to upstream datanode
  long begin = Time.monotonicNow();
  replyAck.write(upstreamOut);
  upstreamOut.flush();
  long duration = Time.monotonicNow() - begin;
  if (duration > datanodeSlowLogThresholdMs) {
    LOG.warn("Slow PacketResponder send ack to upstream took " + duration
        + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString
        + ", replyAck=" + replyAck);
  } else if (LOG.isDebugEnabled()) {
    LOG.debug(myString + ", replyAck=" + replyAck);
  }

  // If a corruption was detected in the received data, terminate after
  // sending ERROR_CHECKSUM back.
  Status myStatus = PipelineAck.getStatusFromHeader(myHeader);
  if (myStatus == Status.ERROR_CHECKSUM) {
    throw new IOException("Shutting down writer and responder "
        + "due to a checksum error in received data. The error "
        + "response has been sent upstream.");
  }
}
项目:big-c    文件:BlockReceiver.java   
public void sendOOB() throws IOException, InterruptedException {
  ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
      .getRestartOOBStatus());
}
项目:big-c    文件:BlockReceiver.java   
/**
 * @param ack Ack received from downstream
 * @param seqno sequence number of ack to be sent upstream
 * @param totalAckTimeNanos total ack time including all the downstream
 *          nodes
 * @param offsetInBlock offset in block for the data in packet
 * @param myHeader the local ack header
 */
private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
    long totalAckTimeNanos, long offsetInBlock, int myHeader)
    throws IOException {
  final int[] replies;
  if (ack == null) {
    // A new OOB response is being sent from this node. Regardless of
    // downstream nodes, reply should contain one reply.
    replies = new int[] { myHeader };
  } else if (mirrorError) { // ack read error
    int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
    int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
    replies = new int[] {h, h1};
  } else {
    short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
        .getNumOfReplies();
    replies = new int[ackLen + 1];
    replies[0] = myHeader;
    for (int i = 0; i < ackLen; ++i) {
      replies[i + 1] = ack.getHeaderFlag(i);
    }
    // If the mirror has reported that it received a corrupt packet,
    // do self-destruct to mark myself bad, instead of making the
    // mirror node bad. The mirror is guaranteed to be good without
    // corrupt data on disk.
    if (ackLen > 0 && PipelineAck.getStatusFromHeader(replies[1]) ==
      Status.ERROR_CHECKSUM) {
      throw new IOException("Shutting down writer and responder "
          + "since the down streams reported the data sent by this "
          + "thread is corrupt");
    }
  }
  PipelineAck replyAck = new PipelineAck(seqno, replies,
      totalAckTimeNanos);
  if (replyAck.isSuccess()
      && offsetInBlock > replicaInfo.getBytesAcked()) {
    replicaInfo.setBytesAcked(offsetInBlock);
  }
  // send my ack back to upstream datanode
  long begin = Time.monotonicNow();
  replyAck.write(upstreamOut);
  upstreamOut.flush();
  long duration = Time.monotonicNow() - begin;
  if (duration > datanodeSlowLogThresholdMs) {
    LOG.warn("Slow PacketResponder send ack to upstream took " + duration
        + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString
        + ", replyAck=" + replyAck);
  } else if (LOG.isDebugEnabled()) {
    LOG.debug(myString + ", replyAck=" + replyAck);
  }

  // If a corruption was detected in the received data, terminate after
  // sending ERROR_CHECKSUM back.
  Status myStatus = PipelineAck.getStatusFromHeader(myHeader);
  if (myStatus == Status.ERROR_CHECKSUM) {
    throw new IOException("Shutting down writer and responder "
        + "due to a checksum error in received data. The error "
        + "response has been sent upstream.");
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:BlockReceiver.java   
public void sendOOB() throws IOException, InterruptedException {
  ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
      .getRestartOOBStatus());
}
项目:hadoop-2.6.0-cdh5.4.3    文件:BlockReceiver.java   
/**
 * @param ack Ack received from downstream
 * @param seqno sequence number of ack to be sent upstream
 * @param totalAckTimeNanos total ack time including all the downstream
 *          nodes
 * @param offsetInBlock offset in block for the data in packet
 * @param myStatus the local ack status
 */
private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
    long totalAckTimeNanos, long offsetInBlock, Status myStatus)
    throws IOException {
  Status[] replies = null;
  if (ack == null) {
    // A new OOB response is being sent from this node. Regardless of
    // downstream nodes, reply should contain one reply.
    replies = new Status[1];
    replies[0] = myStatus;
  } else if (mirrorError) { // ack read error
    replies = MIRROR_ERROR_STATUS;
  } else {
    short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
        .getNumOfReplies();
    replies = new Status[1 + ackLen];
    replies[0] = myStatus;
    for (int i = 0; i < ackLen; i++) {
      replies[i + 1] = ack.getReply(i);
    }
    // If the mirror has reported that it received a corrupt packet,
    // do self-destruct to mark myself bad, instead of making the 
    // mirror node bad. The mirror is guaranteed to be good without
    // corrupt data on disk.
    if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) {
      throw new IOException("Shutting down writer and responder "
          + "since the down streams reported the data sent by this "
          + "thread is corrupt");
    }
  }
  PipelineAck replyAck = new PipelineAck(seqno, replies,
      totalAckTimeNanos);
  if (replyAck.isSuccess()
      && offsetInBlock > replicaInfo.getBytesAcked()) {
    replicaInfo.setBytesAcked(offsetInBlock);
  }
  // send my ack back to upstream datanode
  long begin = Time.monotonicNow();
  replyAck.write(upstreamOut);
  upstreamOut.flush();
  long duration = Time.monotonicNow() - begin;
  if (duration > datanodeSlowLogThresholdMs) {
    LOG.warn("Slow PacketResponder send ack to upstream took " + duration
        + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString
        + ", replyAck=" + replyAck);
  } else if (LOG.isDebugEnabled()) {
    LOG.debug(myString + ", replyAck=" + replyAck);
  }

  // If a corruption was detected in the received data, terminate after
  // sending ERROR_CHECKSUM back. 
  if (myStatus == Status.ERROR_CHECKSUM) {
    throw new IOException("Shutting down writer and responder "
        + "due to a checksum error in received data. The error "
        + "response has been sent upstream.");
  }
}
项目:FlexMap    文件:BlockReceiver.java   
public void sendOOB() throws IOException, InterruptedException {
  ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
      .getRestartOOBStatus());
}
项目:FlexMap    文件:BlockReceiver.java   
/**
 * @param ack Ack received from downstream
 * @param seqno sequence number of ack to be sent upstream
 * @param totalAckTimeNanos total ack time including all the downstream
 *          nodes
 * @param offsetInBlock offset in block for the data in packet
 * @param myStatus the local ack status
 */
private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
    long totalAckTimeNanos, long offsetInBlock, Status myStatus)
    throws IOException {
  Status[] replies = null;
  if (ack == null) {
    // A new OOB response is being sent from this node. Regardless of
    // downstream nodes, reply should contain one reply.
    replies = new Status[1];
    replies[0] = myStatus;
  } else if (mirrorError) { // ack read error
    replies = MIRROR_ERROR_STATUS;
  } else {
    short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
        .getNumOfReplies();
    replies = new Status[1 + ackLen];
    replies[0] = myStatus;
    for (int i = 0; i < ackLen; i++) {
      replies[i + 1] = ack.getReply(i);
    }
    // If the mirror has reported that it received a corrupt packet,
    // do self-destruct to mark myself bad, instead of making the 
    // mirror node bad. The mirror is guaranteed to be good without
    // corrupt data on disk.
    if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) {
      throw new IOException("Shutting down writer and responder "
          + "since the down streams reported the data sent by this "
          + "thread is corrupt");
    }
  }
  PipelineAck replyAck = new PipelineAck(seqno, replies,
      totalAckTimeNanos);
  if (replyAck.isSuccess()
      && offsetInBlock > replicaInfo.getBytesAcked()) {
    replicaInfo.setBytesAcked(offsetInBlock);
  }
  // send my ack back to upstream datanode
  long begin = Time.monotonicNow();
  replyAck.write(upstreamOut);
  upstreamOut.flush();
  long duration = Time.monotonicNow() - begin;
  if (duration > datanodeSlowLogThresholdMs) {
    LOG.warn("Slow PacketResponder send ack to upstream took " + duration
        + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString
        + ", replyAck=" + replyAck);
  } else if (LOG.isDebugEnabled()) {
    LOG.debug(myString + ", replyAck=" + replyAck);
  }

  // If a corruption was detected in the received data, terminate after
  // sending ERROR_CHECKSUM back. 
  if (myStatus == Status.ERROR_CHECKSUM) {
    throw new IOException("Shutting down writer and responder "
        + "due to a checksum error in received data. The error "
        + "response has been sent upstream.");
  }
}
项目:hadoop-on-lustre2    文件:BlockReceiver.java   
/**
 * @param ack Ack received from downstream
 * @param seqno sequence number of ack to be sent upstream
 * @param totalAckTimeNanos total ack time including all the downstream
 *          nodes
 * @param offsetInBlock offset in block for the data in packet
 * @param myStatus the local ack status
 */
private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
    long totalAckTimeNanos, long offsetInBlock, Status myStatus)
    throws IOException {
  Status[] replies = null;
  if (ack == null) {
    // A new OOB response is being sent from this node. Regardless of
    // downstream nodes, reply should contain one reply.
    replies = new Status[1];
    replies[0] = myStatus;
  } else if (mirrorError) { // ack read error
    replies = MIRROR_ERROR_STATUS;
  } else {
    short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
        .getNumOfReplies();
    replies = new Status[1 + ackLen];
    replies[0] = myStatus;
    for (int i = 0; i < ackLen; i++) {
      replies[i + 1] = ack.getReply(i);
    }
    // If the mirror has reported that it received a corrupt packet,
    // do self-destruct to mark myself bad, instead of making the 
    // mirror node bad. The mirror is guaranteed to be good without
    // corrupt data on disk.
    if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) {
      throw new IOException("Shutting down writer and responder "
          + "since the down streams reported the data sent by this "
          + "thread is corrupt");
    }
  }
  PipelineAck replyAck = new PipelineAck(seqno, replies,
      totalAckTimeNanos);
  if (replyAck.isSuccess()
      && offsetInBlock > replicaInfo.getBytesAcked()) {
    replicaInfo.setBytesAcked(offsetInBlock);
  }
  // send my ack back to upstream datanode
  replyAck.write(upstreamOut);
  upstreamOut.flush();
  if (LOG.isDebugEnabled()) {
    LOG.debug(myString + ", replyAck=" + replyAck);
  }

  // If a corruption was detected in the received data, terminate after
  // sending ERROR_CHECKSUM back. 
  if (myStatus == Status.ERROR_CHECKSUM) {
    throw new IOException("Shutting down writer and responder "
        + "due to a checksum error in received data. The error "
        + "response has been sent upstream.");
  }
}
项目:hadoop    文件:DataNode.java   
/**
 * The ECN bit for the DataNode. The DataNode should return:
 * <ul>
 *   <li>ECN.DISABLED when ECN is disabled.</li>
 *   <li>ECN.SUPPORTED when ECN is enabled but the DN still has capacity.</li>
 *   <li>ECN.CONGESTED when ECN is enabled and the DN is congested.</li>
 * </ul>
 */
public PipelineAck.ECN getECN() {
  return pipelineSupportECN ? PipelineAck.ECN.SUPPORTED : PipelineAck.ECN
    .DISABLED;
}