@Override public void run() { final Selector tSelector = this.selector; for (; ; ) { try { tSelector.select(1000L); connect(tSelector); Set<SelectionKey> keys = tSelector.selectedKeys(); try { for (SelectionKey key : keys) { Object att = key.attachment(); if (att != null && key.isValid() && key.isConnectable()) { finishConnect(key, att); } else { key.cancel(); } } } finally { keys.clear(); } } catch (Exception e) { LOGGER.info(name, e); } } }
/** * 启动client服务 * @throws IOException */ public void start(int port) throws IOException { //连接端口 InetSocketAddress inetSocketAddress = new InetSocketAddress(port); System.out.println("Socket Connect To " + inetSocketAddress.getAddress().toString() + ":" + inetSocketAddress.getPort()); socketChannel = SocketChannel.open(inetSocketAddress); //设置非阻塞模式 socketChannel.configureBlocking(false); //写出客户端标识 sendMessage("I am is Falcon Agent Client"); if(selector == null){ //与服务器的连接建立成功 selector = Selector.open(); } }
public FDTSelectionKey register(final UUID fdtsessionID, final SocketChannel channel, final int interests, final SelectionHandler selectionHandler, final FDTKeyAttachement attach) throws InterruptedException { if (channel == null) { throw new NullPointerException("SocketChannel cannot be null"); } if (selectionHandler == null) { throw new NullPointerException("SelectionHanfler cannot be null"); } final Selector sel = getAndRotateSelector(); final SelectionTask sTask = selTasksMap.get(sel); FDTSelectionKey fdtSelectionKey = new FDTSelectionKey(fdtsessionID, channel, interests, selectionHandler, attach, sel, sTask); return fdtSelectionKey; }
private void connect(Selector selector) { AbstractConnection c = null; while ((c = connectQueue.poll()) != null) { try { SocketChannel channel = (SocketChannel) c.getChannel(); // 注册 OP_CONNECT(建立连接) 监听与后端连接是否真正建立 // 监听到之后是图-MySql第3步,(TCP连接建立) channel.register(selector, SelectionKey.OP_CONNECT, c); // 主动连接 阻塞或者非阻塞 // 图-MySql第1步,(TCP连接请求) channel.connect(new InetSocketAddress(c.host, c.port)); } catch (Exception e) { LOGGER.error("error:",e); c.close(e.toString()); } } }
private void register(Selector selector) { AbstractConnection c = null; if (registerQueue.isEmpty()) { return; } while ((c = registerQueue.poll()) != null) { try { // 注册读事件 ((NIOSocketWR) c.getSocketWR()).register(selector); // 连接注册,对于FrontendConnection是发送HandshakePacket并异步读取响应 // 响应为AuthPacket,读取其中的信息,验证用户名密码等信息,如果符合条件 // 则发送OkPacket c.register(); } catch (Exception e) { c.close("register err" + e.toString()); LOGGER.error("register err", e); } } }
@Override public void flush() throws IOException { NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment) nioChannel.getAttachment(); if (att == null) { throw new IOException("Key must be cancelled"); } long writeTimeout = att.getTimeout(); Selector selector = null; try { selector = pool.get(); } catch (IOException x) { // ignore } try { do { if (nioChannel.flush(true, selector, writeTimeout)) { break; } } while (true); } finally { if (selector != null) { pool.put(selector); } } }
/** * Close Selector. * * @see org.apache.catalina.tribes.transport.ReceiverBase#stop() */ protected void stopListening() { setListen(false); Selector selector = this.selector.get(); if (selector != null) { try { // Unlock the thread if is is blocked waiting for input selector.wakeup(); // Wait for the receiver thread to finish int count = 0; while (running && count < 50) { Thread.sleep(100); count ++; } if (running) { log.warn(sm.getString("NioReceiver.stop.threadRunning")); } closeSelector(); } catch (Exception x) { log.error("Unable to close cluster receiver selector.", x); } finally { this.selector.set(null); } } }
@Override public void run() { final Selector selector = this.selector; for (;;) { ++acceptCount; try { selector.select( 1000L ); Set<SelectionKey> keys = selector.selectedKeys(); try { for (SelectionKey key : keys) { if (key.isValid() && key.isAcceptable()) { accept(); } else { key.cancel(); } } } finally { keys.clear(); } } catch (Throwable e) { LOGGER.warn(getName(), e); } } }
/** * * @param bytebuffer ByteBuffer * @param flip boolean * @return int * @throws IOException * TODO Fix non blocking write properly */ private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException { if ( flip ) bytebuffer.flip(); int written = 0; NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(); if ( att == null ) throw new IOException("Key must be cancelled"); long writeTimeout = att.getWriteTimeout(); Selector selector = null; try { selector = pool.get(); } catch ( IOException x ) { //ignore } try { written = pool.write(bytebuffer, socket, selector, writeTimeout, block); //make sure we are flushed do { if (socket.flush(true,selector,writeTimeout)) break; }while ( true ); }finally { if ( selector != null ) pool.put(selector); } if ( block ) bytebuffer.clear(); //only clear return written; }
protected Selector getSharedSelector() throws IOException { if (SHARED && SHARED_SELECTOR == null) { synchronized ( NioSelectorPool.class ) { if ( SHARED_SELECTOR == null ) { synchronized (Selector.class) { // Selector.open() isn't thread safe // http://bugs.sun.com/view_bug.do?bug_id=6427854 // Affects 1.6.0_29, fixed in 1.7.0_01 SHARED_SELECTOR = Selector.open(); } log.info("Using a shared selector for servlet write/read"); } } } return SHARED_SELECTOR; }
protected Selector getSharedSelector() throws IOException { if (SHARED && SHARED_SELECTOR == null) { synchronized (NioSelectorPool.class) { if (SHARED_SELECTOR == null) { synchronized (Selector.class) { // Selector.open() isn't thread safe // http://bugs.sun.com/view_bug.do?bug_id=6427854 // Affects 1.6.0_29, fixed in 1.7.0_01 SHARED_SELECTOR = Selector.open(); } log.info("Using a shared selector for servlet write/read"); } } } return SHARED_SELECTOR; }
public TunnelServer(int port, Selector selector) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); // ServerSocketChannel.bind() requires API 24 serverSocketChannel.socket().bind(new InetSocketAddress(Inet4Address.getLoopbackAddress(), port)); SelectionHandler socketChannelHandler = (selectionKey) -> { try { ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel(); acceptClient(selector, channel); } catch (IOException e) { Log.e(TAG, "Cannot accept client", e); } }; serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, socketChannelHandler); }
public UDPConnection(ConnectionId id, Client client, Selector selector, IPv4Header ipv4Header, UDPHeader udpHeader) throws IOException { super(id, client); networkToClient = new Packetizer(ipv4Header, udpHeader); networkToClient.getResponseIPv4Header().swapSourceAndDestination(); networkToClient.getResponseTransportHeader().swapSourceAndDestination(); touch(); SelectionHandler selectionHandler = (selectionKey) -> { touch(); if (selectionKey.isValid() && selectionKey.isReadable()) { processReceive(); } if (selectionKey.isValid() && selectionKey.isWritable()) { processSend(); } updateInterests(); }; channel = createChannel(); interests = SelectionKey.OP_READ; selectionKey = channel.register(selector, interests, selectionHandler); }
public Listener(final String name) throws IOException { super(name); backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // Bind the server socket to the binding addrees (can be different from the default interface) bind(acceptChannel.socket(), bindAddress, backlogLength); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; readPool = Executors.newFixedThreadPool(readThreads, new ThreadFactoryBuilder().setNameFormat( "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() + ",port=" + port).setDaemon(true).build()); for (int i = 0; i < readThreads; ++i) { Reader reader = new Reader(); readers[i] = reader; readPool.execute(reader); } LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port); // Register accepts on the server socket with the selector. acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("RpcServer.listener,port=" + port); this.setDaemon(true); }
public void addEvent(Runnable event) { Selector selector = this.selector.get(); if ( selector != null ) { synchronized (events) { events.add(event); } if ( log.isTraceEnabled() ) log.trace("Adding event to selector:"+event); if ( isListening() ) selector.wakeup(); } }
public SocketInputStream(SocketChannel socket) throws IOException { this.socket = socket; selector = Selector.open(); socket.register(selector, SelectionKey.OP_READ); buffer = ByteBuffer.allocateDirect(4096); buffer.limit(0); }
private void waitForWriteBufferToDrain() throws IOException { if (selector == null) { selector = Selector.open(); } SelectionKey key = socket.register(selector, SelectionKey.OP_WRITE); // block until ready for write operations selector.select(); // cancel OP_WRITE selection key.cancel(); // complete cancelling key selector.selectNow(); }
private void register(Selector selector) { if ( registerQueue.isEmpty() ) { return; } Connection c = null; while ((c = registerQueue.poll()) != null) { try { c.register(selector); } catch (Exception e) { LOGGER.warn("register error ", e); c.close("register err"); } } }
public void initClient(String ip, int port) throws IOException { // 获得一个Socket通道 SocketChannel channel = SocketChannel.open(); // 设置通道为非阻塞 channel.configureBlocking(false); // 获得一个通道管理器 this.selector = Selector.open(); // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调 channel.connect(new InetSocketAddress(ip, port)); channel.register(selector, SelectionKey.OP_CONNECT); }
public NIOAcceptor(String name, String bindIp, int port, int backlog, FrontendConnectionFactory factory, NIOReactorPool reactorPool) throws IOException { super.setName(name); this.port = port; this.selector = Selector.open(); this.serverChannel = ServerSocketChannel.open(); this.serverChannel.configureBlocking(false); //set TCP option serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024 * 16 * 2); serverChannel.bind(new InetSocketAddress(bindIp, port), backlog); this.serverChannel.register(selector, SelectionKey.OP_ACCEPT); this.factory = factory; this.reactorPool = reactorPool; }
@Override protected void output(byte[] src, int offset, int length) throws IOException { KeyAttachment att = (KeyAttachment) socketWrapper.getSocket().getAttachment(); if (att == null) throw new IOException("Key must be cancelled"); ByteBuffer writeBuffer = socketWrapper.getSocket().getBufHandler().getWriteBuffer(); int thisTime = 0; int written = 0; while (written < length) { int toWrite = Math.min(length - written, writeBuffer.remaining()); writeBuffer.put(src, offset + written, toWrite); writeBuffer.flip(); long writeTimeout = att.getWriteTimeout(); Selector selector = null; try { selector = pool.get(); } catch (IOException x) { // ignore } try { thisTime = pool.write(writeBuffer, socketWrapper.getSocket(), selector, writeTimeout, true); } finally { writeBuffer.clear(); if (selector != null) pool.put(selector); } written += thisTime; } }
public ShadowsocksTunnel(ShadowsocksConfig config,Selector selector) throws Exception { super(config.ServerAddress,selector); if(config.Encryptor==null){ throw new Exception("Error: The Encryptor for ShadowsocksTunnel is null."); } m_Config=config; m_Encryptor=config.Encryptor; }
private Selector openSelector() { Selector result = null; // 在linux平台,尽量启用epoll实现 if (isLinuxPlatform()) { try { final Class<?> providerClazz = Class.forName("sun.nio.ch.EPollSelectorProvider"); if (providerClazz != null) { final Method method = providerClazz.getMethod("provider"); if (method != null) { final SelectorProvider selectorProvider = (SelectorProvider) method.invoke(null); if (selectorProvider != null) { result = selectorProvider.openSelector(); } } } } catch (final Exception ignored) { } } if (result == null) { try { result = SelectorProvider.provider().openSelector(); } catch (IOException e) { throw new NioException("open selector error:" + e.getMessage(), e); } } return result; }
/** * Takes one selector from end of LRU list of free selectors. * If there are no selectors awailable, it creates a new selector. * Also invokes trimIdleSelectors(). * * @param channel * @return * @throws IOException */ private synchronized SelectorInfo get(SelectableChannel channel) throws IOException { SelectorInfo selInfo = null; SelectorProvider provider = channel.provider(); // pick the list : rarely there is more than one provider in use. ProviderInfo pList = providerList; while (pList != null && pList.provider != provider) { pList = pList.next; } if (pList == null) { //LOG.info("Creating new ProviderInfo : " + provider.toString()); pList = new ProviderInfo(); pList.provider = provider; pList.queue = new LinkedList<SelectorInfo>(); pList.next = providerList; providerList = pList; } LinkedList<SelectorInfo> queue = pList.queue; if (queue.isEmpty()) { Selector selector = provider.openSelector(); selInfo = new SelectorInfo(); selInfo.selector = selector; selInfo.queue = queue; } else { selInfo = queue.removeLast(); } trimIdleSelectors(Time.now()); return selInfo; }
public Tunnel(InetSocketAddress serverAddress, Selector selector) throws IOException { SocketChannel innerChannel = SocketChannel.open(); innerChannel.configureBlocking(false); this.m_InnerChannel = innerChannel; this.m_Selector = selector; this.m_ServerEP = serverAddress; SessionCount++; }
private int fillReadBuffer(boolean block) throws IOException { int nRead; if (block) { Selector selector = null; try { selector = pool.get(); } catch (IOException x) { // Ignore } try { NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment) nioChannel.getAttachment(); if (att == null) { throw new IOException("Key must be cancelled."); } nRead = pool.read(nioChannel.getBufHandler().getReadBuffer(), nioChannel, selector, att.getTimeout()); } catch (EOFException eof) { nRead = -1; } finally { if (selector != null) { pool.put(selector); } } } else { nRead = nioChannel.read(nioChannel.getBufHandler().getReadBuffer()); } return nRead; }
void start(SocketChannel channel) throws IOException { selector = Selector.open(); channel.register(selector, SelectionKey.OP_READ); if (!running) { running = true; thread = new Thread(this, TAG); thread.setPriority(Process.THREAD_PRIORITY_BACKGROUND); thread.start(); } else { if (selector != null) { selector.wakeup(); } } }
protected void socketTimeouts() { long now = System.currentTimeMillis(); if ( (now-lastCheck) < getSelectorTimeout() ) return; //timeout Selector tmpsel = this.selector.get(); Set<SelectionKey> keys = (isListening()&&tmpsel!=null)?tmpsel.keys():null; if ( keys == null ) return; for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext();) { SelectionKey key = iter.next(); try { // if (key.interestOps() == SelectionKey.OP_READ) { // //only timeout sockets that we are waiting for a read from // ObjectReader ka = (ObjectReader) key.attachment(); // long delta = now - ka.getLastAccess(); // if (delta > (long) getTimeout()) { // cancelledKey(key); // } // } // else if ( key.interestOps() == 0 ) { //check for keys that didn't make it in. ObjectReader ka = (ObjectReader) key.attachment(); if ( ka != null ) { long delta = now - ka.getLastAccess(); if (delta > getTimeout() && (!ka.isAccessed())) { if (log.isWarnEnabled()) log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new java.sql.Timestamp(ka.getLastAccess())+" Possible cause: all threads used, perform thread dump"); ka.setLastAccess(now); //key.interestOps(SelectionKey.OP_READ); }//end if } else { cancelledKey(key); }//end if }//end if }catch ( CancelledKeyException ckx ) { cancelledKey(key); } } lastCheck = System.currentTimeMillis(); }
public void initChannel() throws Exception{ channel = ServerSocketChannel.open(); channel.socket().bind(new InetSocketAddress(port)); channel.configureBlocking(false); selector = Selector.open(); channel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("NIO TCP Push Listener nio provider: "+selector.provider().getClass().getCanonicalName()); }
/** * 传递客户端连接的SocketChannel,然后用非阻塞模式进行读事件和写事件的相应 * * @param socketChannel 客户端SocketChannel * @throws IOException */ public Talk(SocketChannel socketChannel,Agent agent) throws IOException { this.socketChannel = socketChannel; selector = Selector.open(); ByteBuffer buffer = ByteBuffer.allocate(2048); //创建用于存放用户发来的数据的缓冲区,并将其作为通道附件的形式进行保存 socketChannel.configureBlocking(false);//设置非阻塞模式 //向Selector注册读就绪事件和写就绪事件,以便响应客户端发来的数据 socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); this.agent = agent; }
SelectorManager(HttpClientImpl ref) throws IOException { super(null, null, "SelectorManager", 0, false); ownerRef = new WeakReference<>(ref); readyList = new ArrayList<>(); registrations = new ArrayList<>(); selector = Selector.open(); }
/** * Reinit server on some port * * @param port port address * @throws IOException */ private void portInit(int port) throws IOException { this.port = port; sel = Selector.open(); server = ServerSocketChannel.open(); server.configureBlocking(false); if (serverConsole instanceof StupidConsole) //remote server side server.socket().bind( new InetSocketAddress(InetAddress.getLocalHost(), port) ); else //localhost side server.socket().bind( new InetSocketAddress("localhost", port) ); cancellsed.set(false); //if main chat haven't initialized yet String aDefault = chats.keySet() .stream() .filter(s -> s.equals("default")) .findAny() .orElse(null); if (aDefault == null) { chats.put("default", new StringBuilder("")); } }
@Override public void onCreate() { // registerNetReceiver(); super.onCreate(); isRunning = true; setupHostFile(); setupVPN(); try { udpSelector = Selector.open(); tcpSelector = Selector.open(); deviceToNetworkUDPQueue = new ConcurrentLinkedQueue<>(); deviceToNetworkTCPQueue = new ConcurrentLinkedQueue<>(); networkToDeviceQueue = new ConcurrentLinkedQueue<>(); udpSelectorLock=new ReentrantLock(); tcpSelectorLock=new ReentrantLock(); executorService = Executors.newFixedThreadPool(5); executorService.submit(new UDPInput(networkToDeviceQueue, udpSelector, udpSelectorLock)); executorService.submit(new UDPOutput(deviceToNetworkUDPQueue, networkToDeviceQueue, udpSelector,udpSelectorLock, this)); executorService.submit(new TCPInput(networkToDeviceQueue, tcpSelector,tcpSelectorLock)); executorService.submit(new TCPOutput(deviceToNetworkTCPQueue, networkToDeviceQueue, tcpSelector,tcpSelectorLock, this)); executorService.submit(new VPNRunnable(vpnInterface.getFileDescriptor(), deviceToNetworkUDPQueue, deviceToNetworkTCPQueue, networkToDeviceQueue)); LocalBroadcastManager.getInstance(this).sendBroadcast(new Intent(BROADCAST_VPN_STATE).putExtra("running", true)); Log.i(TAG, "Started"); } catch (IOException e) { // TODO: Here and elsewhere, we should explicitly notify the user of any errors // and suggest that they stop the service, since we can't do it ourselves Log.e(TAG, "Error starting service", e); cleanup(); } }
public SelectorThread(SelectorConfig sc, IMMOExecutor<T> executor, IPacketHandler<T> packetHandler, IClientFactory<T> clientFactory, IAcceptFilter acceptFilter) throws IOException { super.setName("SelectorThread-" + super.getId()); HELPER_BUFFER_SIZE = sc.HELPER_BUFFER_SIZE; HELPER_BUFFER_COUNT = sc.HELPER_BUFFER_COUNT; MAX_SEND_PER_PASS = sc.MAX_SEND_PER_PASS; MAX_READ_PER_PASS = sc.MAX_READ_PER_PASS; SLEEP_TIME = sc.SLEEP_TIME; TCP_NODELAY = sc.TCP_NODELAY; DIRECT_WRITE_BUFFER = ByteBuffer.allocateDirect(sc.WRITE_BUFFER_SIZE).order(BYTE_ORDER); WRITE_BUFFER = ByteBuffer.wrap(new byte[sc.WRITE_BUFFER_SIZE]).order(BYTE_ORDER); READ_BUFFER = ByteBuffer.wrap(new byte[sc.READ_BUFFER_SIZE]).order(BYTE_ORDER); STRING_BUFFER = new NioNetStringBuffer(64 * 1024); _pendingClose = new NioNetStackList<>(); _bufferPool = new LinkedList<>(); for (int i = 0; i < HELPER_BUFFER_COUNT; i++) { _bufferPool.addLast(ByteBuffer.wrap(new byte[HELPER_BUFFER_SIZE]).order(BYTE_ORDER)); } _acceptFilter = acceptFilter; _packetHandler = packetHandler; _clientFactory = clientFactory; _executor = executor; _selector = Selector.open(); }
private int doWriteInternal(boolean block, byte[] b, int off, int len) throws IOException { channel.getBufHandler().getWriteBuffer().clear(); channel.getBufHandler().getWriteBuffer().put(b, off, len); channel.getBufHandler().getWriteBuffer().flip(); int written = 0; NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment) channel.getAttachment(); if (att == null) { throw new IOException("Key must be cancelled"); } long writeTimeout = att.getWriteTimeout(); Selector selector = null; try { selector = pool.get(); } catch (IOException x) { // ignore } try { written = pool.write(channel.getBufHandler().getWriteBuffer(), channel, selector, writeTimeout, block); } finally { if (selector != null) { pool.put(selector); } } if (written < len) { channel.getPoller().add(channel, SelectionKey.OP_WRITE); } return written; }
public static void main(String[] args) throws Exception { final Selector sel = Selector.open(); Runnable r = new Runnable() { public void run() { try { sel.select(); } catch (IOException x) { x.printStackTrace(); } catch (ClosedSelectorException y) { System.err.println ("Caught expected ClosedSelectorException"); } } }; // start thread to block in Selector Thread t = new Thread(r); t.start(); // give thread time to start Thread.sleep(1000); // interrupt, close, and wakeup is the magic sequence to provoke the NPE t.interrupt(); sel.close(); sel.wakeup(); }
public void addEvent(Runnable event) { Selector selector = this.selector.get(); if (selector != null) { synchronized (events) { events.add(event); } if (log.isTraceEnabled()) log.trace("Adding event to selector:" + event); if (isListening()) selector.wakeup(); } }
protected void bind() throws IOException { // allocate an unbound server socket channel serverChannel = ServerSocketChannel.open(); // Get the associated ServerSocket to bind it with ServerSocket serverSocket = serverChannel.socket(); // create a new Selector for use below synchronized (Selector.class) { // Selector.open() isn't thread safe // http://bugs.sun.com/view_bug.do?bug_id=6427854 // Affects 1.6.0_29, fixed in 1.7.0_01 this.selector.set(Selector.open()); } // set the port the server channel will listen to // serverSocket.bind(new InetSocketAddress(getBind(), // getTcpListenPort())); bind(serverSocket, getPort(), getAutoBind()); // set non-blocking mode for the listening socket serverChannel.configureBlocking(false); // register the ServerSocketChannel with the Selector serverChannel.register(this.selector.get(), SelectionKey.OP_ACCEPT); // set up the datagram channel if (this.getUdpPort() > 0) { datagramChannel = DatagramChannel.open(); configureDatagraChannel(); // bind to the address to avoid security checks bindUdp(datagramChannel.socket(), getUdpPort(), getAutoBind()); } }
public void stopIt() { try { for (Map.Entry<Selector, SelectionTask> entry : selTasksMap.entrySet()) { entry.getValue().stopIt(); entry.getValue().selector.wakeup(); } } catch (Throwable t) { t.printStackTrace(); } }
private void handleTimeout(){ Selector tmpsel = selector; Set keys = (stoped == false && tmpsel!=null)?tmpsel.keys():null; if ( keys == null ) { return; } Iterator it = keys.iterator(); long now = System.currentTimeMillis(); //cancel timeout and no interestOps keys,close socket and channel while(it.hasNext()){ SelectionKey key = (SelectionKey) it.next(); if(key.channel() instanceof ServerSocketChannel){ continue; } if(key.isValid() == false){ continue; } try{ MessengerTask task = (MessengerTask)key.attachment(); if(task == null){ cancelKey(key); continue; } if(task.isWritePending() == false && now - task.getLastActive() > sockTimout){ cancelKey(key); } }catch(CancelledKeyException e){ cancelKey(key); } } }