@Override void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { try { if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) { return; } Packet head = null; if (needSasl.get()) { if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) { return; } } else { if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) { return; } } // check if being waken up on closing. if (!sendThread.getZkState().isAlive()) { // adding back the patck to notify of failure in conLossPacket(). addBack(head); return; } // channel disconnection happened if (disconnected.get()) { addBack(head); throw new EndOfStreamException("channel for sessionid 0x" + Long.toHexString(sessionId) + " is lost"); } if (head != null) { doWrite(pendingQueue, head, cnxn); } } finally { updateNow(); } }
/** * @return true if a packet was received * @throws InterruptedException * @throws IOException */ boolean doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue) throws InterruptedException, IOException { boolean packetReceived = false; SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount++; readLength(); } else if (!initialized) { readConnectResult(); enableRead(); if (!outgoingQueue.isEmpty()) { enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; packetReceived = true; initialized = true; } else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; packetReceived = true; } } } if (sockKey.isWritable()) { synchronized (outgoingQueue) { if (!outgoingQueue.isEmpty()) { ByteBuffer pbb = outgoingQueue.getFirst().bb; sock.write(pbb); if (!pbb.hasRemaining()) { sentCount++; Packet p = outgoingQueue.removeFirst(); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { pendingQueue.add(p); } } } } } if (outgoingQueue.isEmpty()) { disableWrite(); } else { enableWrite(); } return packetReceived; }