public void acceptCallback(SelectableChannel channel) { // accept the connection assert channel == serverSocket; SocketChannel client; try { client = serverSocket.accept(); } catch (IOException e) { throw new RuntimeException(e); } assert client != null; // wrap it in a message connection and register with event loop NIOMessageConnection connection = new NIOMessageConnection(client); connection.setBigEndian(); this.eventLoop.registerRead(client, new ClientConnectionHandler(connection)); this.numConnections.incrementAndGet(); }
@Override public void acceptCallback(SelectableChannel channel) { // accept the connection assert channel == serverSocket; SocketChannel client; try { client = serverSocket.accept(); } catch (IOException e) { throw new RuntimeException(e); } assert client != null; // wrap it in a message connection and register with event loop ProtoConnection connection = new ProtoConnection(new NonBlockingConnection(client)); eventLoop.registerRead(client, new EventCallbackWrapper(connection)); // SelectionKey clientKey = connection.register(selector); // clientKey.attach(connection); // eventQueue.add(new Event(connection, null)); }
private void addInterest(SelectableChannel channel, int operation, Handler callback) { // TODO: Support multiple handlers? SelectionKey key = channel.keyFor(selector); if (key != null) { assert (key.interestOps() & operation) == 0; if (key.attachment() == null) { key.attach(callback); } else { assert callback == key.attachment(); } key.interestOps(key.interestOps() | operation); // TODO: This fixes a synchronization issue where one thread changes the interest set // of a thread while another thread is blocked in select(), because the Selector // documentation states that it waits for events registered "as of the moment that the // selection operation began. Is there a better fix? selector.wakeup(); } else { register(channel, operation, callback); } }
private static String timeoutExceptionString(SelectableChannel channel, long timeout, int ops) { String waitingFor; switch(ops) { case SelectionKey.OP_READ : waitingFor = "read"; break; case SelectionKey.OP_WRITE : waitingFor = "write"; break; case SelectionKey.OP_CONNECT : waitingFor = "connect"; break; default : waitingFor = "" + ops; } return timeout + " millis timeout while " + "waiting for channel to be ready for " + waitingFor + ". ch : " + channel; }
/** * In the case we are using the java select() method, this method is used to * trash the buggy selector and create a new one, registering all the * sockets on it. */ @Override protected void registerNewSelector() throws IOException { synchronized (selector) { Set<SelectionKey> keys = selector.keys(); // Open a new selector Selector newSelector = Selector.open(); // Loop on all the registered keys, and register them on the new selector for (SelectionKey key : keys) { SelectableChannel ch = key.channel(); // Don't forget to attache the session, and back ! NioSession session = (NioSession) key.attachment(); SelectionKey newKey = ch.register(newSelector, key.interestOps(), session); session.setSelectionKey(newKey); } // Now we can close the old selector and switch it selector.close(); selector = newSelector; } }
private void handleIOException( SelectionKey key, WebSocket conn, IOException ex ) { // onWebsocketError( conn, ex );// conn may be null here if( conn != null ) { conn.closeConnection( CloseFrame.ABNORMAL_CLOSE, ex.getMessage() ); } else if( key != null ) { SelectableChannel channel = key.channel(); if( channel != null && channel.isOpen() ) { // this could be the case if the IOException ex is a SSLException try { channel.close(); } catch ( IOException e ) { // there is nothing that must be done here } if( WebSocketImpl.DEBUG ) System.out.println( "Connection closed because of" + ex ); } } }
private void handleIOException(SelectionKey key, WebSocket conn, IOException ex) { // onWebsocketError( conn, ex );// conn may be null here if (conn != null) { conn.closeConnection(CloseFrame.ABNORMAL_CLOSE, ex.getMessage()); } else if (key != null) { SelectableChannel channel = key.channel(); if (channel != null && channel.isOpen()) { // this could be the case if the IOException ex is a SSLException try { channel.close(); } catch (IOException e) { // there is nothing that must be done here } if (WebSocketImpl.DEBUG) { System.out.println("Connection closed because of" + ex); } } } }
@Override public void readCallback(SelectableChannel channel) { try { read(this); } catch (RuntimeException ex) { if (ex.getCause() instanceof IOException) { // Ignore this if (LOG.isDebugEnabled()) LOG.warn("Client connection closed unexpectedly", ex); } else { throw ex; } } }
@Override public void registerRead(SelectableChannel channel, Handler handler) { // Disallow both being registered for read events and connection events at the same time. // On Linux, when a connect fails, the socket is ready for both events, which causes // errors when reads are attempted on the closed socket. assert channel.keyFor(selector) == null || (channel.keyFor(selector).interestOps() & SelectionKey.OP_CONNECT) == 0; addInterest(channel, SelectionKey.OP_READ, handler); }
/** * Takes one selector from end of LRU list of free selectors. * If there are no selectors awailable, it creates a new selector. * Also invokes trimIdleSelectors(). * * @param channel * @return * @throws IOException */ private synchronized SelectorInfo get(SelectableChannel channel) throws IOException { SelectorInfo selInfo = null; SelectorProvider provider = channel.provider(); // pick the list : rarely there is more than one provider in use. ProviderInfo pList = providerList; while (pList != null && pList.provider != provider) { pList = pList.next; } if (pList == null) { //LOG.info("Creating new ProviderInfo : " + provider.toString()); pList = new ProviderInfo(); pList.provider = provider; pList.queue = new LinkedList<SelectorInfo>(); pList.next = providerList; providerList = pList; } LinkedList<SelectorInfo> queue = pList.queue; if (queue.isEmpty()) { Selector selector = provider.openSelector(); selInfo = new SelectorInfo(); selInfo.selector = selector; selInfo.queue = queue; } else { selInfo = queue.removeLast(); } trimIdleSelectors(Time.now()); return selInfo; }
@Override public void readCallback(SelectableChannel channel) { boolean isOpen = connection.readAllAvailable(); if (!isOpen) { // TODO: Fail any subsequent RPCs throw new UnsupportedOperationException("Connection closed: not handled (for now)."); } while (true) { RpcResponse.Builder builder = RpcResponse.newBuilder(); boolean success = connection.readBufferedMessage(builder); if (!success) { // TODO: Cache the builder object to reduce garbage? break; } // Set the appropriate flags on the RPC object // TODO: Handle bad sequence number by ignoring/logging? RpcResponse response = builder.build(); ProtoRpcController rpc = null; synchronized (this) { rpc = pendingRpcs.remove(response.getSequenceNumber()); assert response.getStatus() == Protocol.Status.OK; assert rpc != null : "No ProtoRpcController for Sequence# " + response.getSequenceNumber(); } rpc.finishRpcSuccess(response.getResponse()); } }
@Override public void acceptCallback(SelectableChannel channel) { // accept the connection assert client == null; try { client = ((ServerSocketChannel) channel).accept(); } catch (IOException e) { throw new RuntimeException(e); } assert client != null; }
@Override public void registerWrite(SelectableChannel channel, Handler handler) { if (writeHandler != null) { throw new IllegalStateException("Each channel can only call registerWrite() once"); } writeHandler = handler; }
protected void implClose() throws IOException { synchronized (closeLock) { if (channelArray != null) { if (pollWrapper != null) { // prevent further wakeup synchronized (interruptLock) { interruptTriggered = true; } wakeupPipe.sink().close(); wakeupPipe.source().close(); for(int i = 1; i < totalChannels; i++) { // Deregister channels if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent deregister(channelArray[i]); SelectableChannel selch = channelArray[i].channel(); if (!selch.isOpen() && !selch.isRegistered()) ((SelChImpl)selch).kill(); } } pollWrapper.free(); pollWrapper = null; selectedKeys = null; channelArray = null; // Make all remaining helper threads exit for (SelectThread t: threads) t.makeZombie(); startLock.startThreads(); } } } }
private void unregisterChannel(SelectableChannel channel) { SelectionKey key = channel.keyFor(selector); if (key != null) { // not registered with this selector => null returned key.cancel(); } }
/** * {@inheritDoc} */ @Override protected boolean isBrokenConnection() throws IOException { // A flag set to true if we find a broken session boolean brokenSession = false; synchronized (selector) { // Get the selector keys Set<SelectionKey> keys = selector.keys(); // Loop on all the keys to see if one of them // has a closed channel for (SelectionKey key : keys) { SelectableChannel channel = key.channel(); if ((((channel instanceof DatagramChannel) && !((DatagramChannel) channel).isConnected())) || ((channel instanceof SocketChannel) && !((SocketChannel) channel).isConnected())) { // The channel is not connected anymore. Cancel // the associated key then. key.cancel(); // Set the flag to true to avoid a selector switch brokenSession = true; } } } return brokenSession; }
/** * Register the given channel with the given selector for * the given operations of interest */ protected void registerChannel(Selector selector, SelectableChannel channel, int ops, Object attach) throws Exception { if (channel == null)return; // could happen // set the new channel non-blocking channel.configureBlocking(false); // register it with the selector channel.register(selector, ops, attach); }
private void clearDeferredRegistrations() { synchronized (deferredRegistrations) { int deferredListSize = deferredRegistrations.size(); if (orb.transportDebugFlag) { dprint(".clearDeferredRegistrations:deferred list size == " + deferredListSize); } for (int i = 0; i < deferredListSize; i++) { EventHandler eventHandler = (EventHandler)deferredRegistrations.get(i); if (orb.transportDebugFlag) { dprint(".clearDeferredRegistrations: " + eventHandler); } SelectableChannel channel = eventHandler.getChannel(); SelectionKey selectionKey = null; try { if (orb.transportDebugFlag) { dprint(".clearDeferredRegistrations:close channel == " + channel); dprint(".clearDeferredRegistrations:close channel class == " + channel.getClass().getName()); } channel.close(); selectionKey = eventHandler.getSelectionKey(); if (selectionKey != null) { selectionKey.cancel(); selectionKey.attach(null); } } catch (IOException ioEx) { if (orb.transportDebugFlag) { dprint(".clearDeferredRegistrations: ", ioEx); } } } deferredRegistrations.clear(); } }
protected void implDereg(SelectionKeyImpl ski) throws IOException{ int i = ski.getIndex(); assert (i >= 0); synchronized (closeLock) { if (i != totalChannels - 1) { // Copy end one over it SelectionKeyImpl endChannel = channelArray[totalChannels-1]; channelArray[i] = endChannel; endChannel.setIndex(i); pollWrapper.replaceEntry(pollWrapper, totalChannels - 1, pollWrapper, i); } ski.setIndex(-1); } channelArray[totalChannels - 1] = null; totalChannels--; if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) { totalChannels--; threadsCount--; // The last thread has become redundant. } fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys keys.remove(ski); selectedKeys.remove(ski); deregister(ski); SelectableChannel selch = ski.channel(); if (!selch.isOpen() && !selch.isRegistered()) ((SelChImpl)selch).kill(); }
private void register(SelectableChannel channel, int ops, Handler callback) { try { channel.configureBlocking(false); /*SelectionKey serverKey =*/ channel.register(selector, ops, callback); } catch (IOException e) { throw new RuntimeException(e); } }
protected void implDereg(SelectionKeyImpl ski) throws IOException { channelArray.Remove(ski); fdMap.remove(ski.getSocket()); keys.remove(ski); selectedKeys.remove(ski); deregister(ski); SelectableChannel selch = ski.channel(); if (!selch.isOpen() && !selch.isRegistered()) { ((SelChImpl)selch).kill(); } }
void debugPrint(Selector selector) { System.err.println("Selector: debugprint start"); Set<SelectionKey> keys = selector.keys(); for (SelectionKey key : keys) { SelectableChannel c = key.channel(); int ops = key.interestOps(); System.err.printf("selector chan:%s ops:%d\n", c, ops); } System.err.println("Selector: debugprint end"); }
/** * Switch this guy to blocking mode so we can use oldIO to read and write msgs. */ public void makeBlocking() throws IOException { // logger.info("DEBUG: makeBlocking " + this); // if (this.sKey != null) { // this.sKey = null; // } SelectableChannel c = this.theSocket.getChannel(); c.configureBlocking(true); }
/** * Utility function to check if channel is ok. * Mainly to throw IOException instead of runtime exception * in case of mismatch. This mismatch can occur for many runtime * reasons. */ static void checkChannelValidity(Object channel) throws IOException { if (channel == null) { /* Most common reason is that original socket does not have a channel. * So making this an IOException rather than a RuntimeException. */ throw new IOException("Channel is null. Check " + "how the channel or socket is created."); } if (!(channel instanceof SelectableChannel)) { throw new IOException("Channel should be a SelectableChannel"); } }
/** * Takes one selector from end of LRU list of free selectors. * If there are no selectors awailable, it creates a new selector. * Also invokes trimIdleSelectors(). * * @param channel * @return * @throws IOException */ private synchronized SelectorInfo get(SelectableChannel channel) throws IOException { SelectorInfo selInfo = null; SelectorProvider provider = channel.provider(); // pick the list : rarely there is more than one provider in use. ProviderInfo pList = providerList; while (pList != null && pList.provider != provider) { pList = pList.next; } if (pList == null) { //LOG.info("Creating new ProviderInfo : " + provider.toString()); pList = new ProviderInfo(); pList.provider = provider; pList.queue = new LinkedList<SelectorInfo>(); pList.next = providerList; providerList = pList; } LinkedList<SelectorInfo> queue = pList.queue; if (queue.isEmpty()) { Selector selector = provider.openSelector(); selInfo = new SelectorInfo(); selInfo.selector = selector; selInfo.queue = queue; } else { selInfo = queue.removeLast(); } trimIdleSelectors(System.currentTimeMillis()); return selInfo; }
private void handleDeferredRegistrations() { synchronized (deferredRegistrations) { int deferredListSize = deferredRegistrations.size(); for (int i = 0; i < deferredListSize; i++) { EventHandler eventHandler = (EventHandler)deferredRegistrations.get(i); if (orb.transportDebugFlag) { dprint(".handleDeferredRegistrations: " + eventHandler); } SelectableChannel channel = eventHandler.getChannel(); SelectionKey selectionKey = null; try { selectionKey = channel.register(selector, eventHandler.getInterestOps(), (Object)eventHandler); } catch (ClosedChannelException e) { if (orb.transportDebugFlag) { dprint(".handleDeferredRegistrations: " + e); } } eventHandler.setSelectionKey(selectionKey); } deferredRegistrations.clear(); } }
@Override public synchronized boolean writeCallback(SelectableChannel channel) { connectionBlocked = connection.tryWrite(); return connectionBlocked; }
/** Returns the underlying channel for registration with a selector. */ // TODO: Remove register()? public SelectableChannel getChannel();
/** Constructor provided mostly for unit tests. */ public NonBlockingConnection(SelectableChannel selectable, ByteChannel channel) { this.channel = selectable; read = new NIOReadStream(channel); write = new NIOWriteStream(channel); }
/** Returns the underlying channel for registration with a selector. */ public SelectableChannel getChannel() { return channel; }
public CPSink(NioThread t, SelectableChannel c, boolean clientSocket) { super(t, c, clientSocket); }