@Override void cleanup() { connectLock.lock(); try { if (connectFuture != null) { connectFuture.cancel(); connectFuture = null; } if (channel != null) { channel.close().awaitUninterruptibly(); channel = null; } } finally { connectLock.unlock(); } Iterator<Packet> iter = outgoingQueue.iterator(); while (iter.hasNext()) { Packet p = iter.next(); if (p == WakeupPacket.getInstance()) { iter.remove(); } } }
/** * doWrite handles writing the packets from outgoingQueue via network to server. */ private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) { updateNow(); while (true) { if (p != WakeupPacket.getInstance()) { if ((p.requestHeader != null) && (p.requestHeader.getType() != ZooDefs.OpCode.ping) && (p.requestHeader.getType() != ZooDefs.OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); synchronized (pendingQueue) { pendingQueue.add(p); } } sendPkt(p); } if (outgoingQueue.isEmpty()) { break; } p = outgoingQueue.remove(); } }
private Packet findSendablePacket(LinkedList<Packet> outgoingQueue, boolean clientTunneledAuthenticationInProgress) { synchronized (outgoingQueue) { if (outgoingQueue.isEmpty()) { return null; } if (outgoingQueue.getFirst().bb != null // If we've already starting sending the first packet, we better finish || !clientTunneledAuthenticationInProgress) { return outgoingQueue.getFirst(); } // Since client's authentication with server is in progress, // send only the null-header packet queued by primeConnection(). // This packet must be sent so that the SASL authentication process // can proceed, but all other packets should wait until // SASL authentication completes. ListIterator<Packet> iter = outgoingQueue.listIterator(); while (iter.hasNext()) { Packet p = iter.next(); if (p.requestHeader == null) { // We've found the priming-packet. Move it to the beginning of the queue. iter.remove(); outgoingQueue.add(0, p); return p; } else { // Non-priming packet: defer it until later, leaving it in the queue // until authentication completes. if (LOG.isDebugEnabled()) { LOG.debug("deferring non-priming packet: " + p + "until SASL authentication completes."); } } } // no sendable packet found. return null; } }
@Override void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { selector.select(waitTimeOut); Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } // Everything below and until we get back to the select is // non blocking, so time is effectively a constant. That is // Why we just have to do this once, here updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { doIO(pendingQueue, outgoingQueue, cnxn); } } if (sendThread.getZkState().isConnected()) { synchronized(outgoingQueue) { if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { enableWrite(); } } } selected.clear(); }
@Override void sendPacket(Packet p) throws IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } p.createBB(); ByteBuffer pbb = p.bb; sock.write(pbb); }
private Packet findSendablePacket(LinkedBlockingDeque<Packet> outgoingQueue, boolean tunneledAuthInProgres) { if (outgoingQueue.isEmpty()) { return null; } // If we've already starting sending the first packet, we better finish if (outgoingQueue.getFirst().bb != null || !tunneledAuthInProgres) { return outgoingQueue.getFirst(); } // Since client's authentication with server is in progress, // send only the null-header packet queued by primeConnection(). // This packet must be sent so that the SASL authentication process // can proceed, but all other packets should wait until // SASL authentication completes. Iterator<Packet> iter = outgoingQueue.iterator(); while (iter.hasNext()) { Packet p = iter.next(); if (p.requestHeader == null) { // We've found the priming-packet. Move it to the beginning of the queue. iter.remove(); outgoingQueue.addFirst(p); return p; } else { // Non-priming packet: defer it until later, leaving it in the queue // until authentication completes. LOG.debug("deferring non-priming packet {} until SASL authentation completes.", p); } } return null; }
@Override void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { selector.select(waitTimeOut); Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } // Everything below and until we get back to the select is // non blocking, so time is effectively a constant. That is // Why we just have to do this once, here updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); updateSocketAddresses(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { doIO(pendingQueue, cnxn); } } if (sendThread.getZkState().isConnected()) { if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { enableWrite(); } } selected.clear(); }
@Override void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { try { if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) { return; } Packet head = null; if (needSasl.get()) { if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) { return; } } else { if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) { return; } } // check if being waken up on closing. if (!sendThread.getZkState().isAlive()) { // adding back the patck to notify of failure in conLossPacket(). addBack(head); return; } // channel disconnection happened if (disconnected.get()) { addBack(head); throw new EndOfStreamException("channel for sessionid 0x" + Long.toHexString(sessionId) + " is lost"); } if (head != null) { doWrite(pendingQueue, head, cnxn); } } finally { updateNow(); } }
private void sendPkt(Packet p) { // Assuming the packet will be sent out successfully. Because if it fails, // the channel will close and clean up queues. p.createBB(); updateLastSend(); sentCount++; channel.write(ChannelBuffers.wrappedBuffer(p.bb)); }
@Override void sendPacket(ClientCnxn.Packet p) throws IOException { if (channel == null) { throw new IOException("channel has been closed"); } sendPkt(p); }
@Override void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue ) throws IOException, InterruptedException { selector.select(waitTimeOut); Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } // Everything below and until we get back to the select is // non blocking, so time is effectively a constant. That is // Why we just have to do this once, here updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { if (outgoingQueue.size() > 0) { // We have something to send so it's the same // as if we do the send now. updateLastSend(); } if (doIO(pendingQueue, outgoingQueue)) { updateLastHeard(); } } } if (sendThread.getZkState().isConnected()) { if (outgoingQueue.size() > 0) { enableWrite(); } else { disableWrite(); } } selected.clear(); }